mirror of
https://github.com/griffi-gh/kubi.git
synced 2024-11-21 14:28:43 -06:00
commit
6dccc97933
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -1,3 +1,5 @@
|
|||
.direnv
|
||||
|
||||
# Generated by Cargo
|
||||
# will have compiled files and executables
|
||||
debug/
|
||||
|
@ -15,6 +17,7 @@ _src
|
|||
_visualizer.json
|
||||
|
||||
*.kubi
|
||||
*.kbi
|
||||
|
||||
/*_log*.txt
|
||||
/*.log
|
||||
|
@ -37,4 +40,4 @@ _visualizer.json
|
|||
|
||||
/mods
|
||||
|
||||
.direnv
|
||||
|
||||
|
|
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1307,8 +1307,10 @@ dependencies = [
|
|||
"bincode",
|
||||
"bytemuck",
|
||||
"fastnoise-lite",
|
||||
"flume",
|
||||
"glam",
|
||||
"hashbrown 0.14.5",
|
||||
"log",
|
||||
"nohash-hasher",
|
||||
"num_enum",
|
||||
"nz",
|
||||
|
|
|
@ -4,6 +4,7 @@ max_clients = 32
|
|||
timeout_ms = 10000
|
||||
|
||||
[world]
|
||||
file = "world.kubi"
|
||||
seed = 0xfeb_face_dead_cafe
|
||||
preheat_radius = 8
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ pub fn sync_client_positions(
|
|||
};
|
||||
|
||||
//log movement (annoying duh)
|
||||
log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction);
|
||||
// log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction);
|
||||
|
||||
//Apply position to server-side client
|
||||
let mut trans = (&mut transforms).get(message.entity_id).unwrap();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use shipyard::{AllStoragesView, Unique};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use std::{fs, net::SocketAddr};
|
||||
use std::{fs, net::SocketAddr, path::PathBuf};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ConfigTableServer {
|
||||
|
@ -12,6 +12,7 @@ pub struct ConfigTableServer {
|
|||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ConfigTableWorld {
|
||||
pub file: Option<PathBuf>,
|
||||
pub seed: u64,
|
||||
pub preheat_radius: u32,
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use shipyard::{IntoWorkload, Workload, WorkloadModificator, World};
|
||||
use shipyard::{IntoWorkload, SystemModificator, Workload, WorkloadModificator, World};
|
||||
use std::{thread, time::Duration};
|
||||
use kubi_shared::fixed_timestamp::{FixedTimestamp, init_fixed_timestamp_storage};
|
||||
|
||||
mod util;
|
||||
mod config;
|
||||
|
@ -12,10 +13,11 @@ use config::read_config;
|
|||
use server::{bind_server, update_server, log_server_errors};
|
||||
use client::{init_client_maps, on_client_disconnect, sync_client_positions};
|
||||
use auth::authenticate_players;
|
||||
use world::{update_world, init_world};
|
||||
use world::{init_world, save::save_modified, update_world};
|
||||
|
||||
fn initialize() -> Workload {
|
||||
(
|
||||
init_fixed_timestamp_storage,
|
||||
read_config,
|
||||
bind_server,
|
||||
init_client_maps,
|
||||
|
@ -32,7 +34,10 @@ fn update() -> Workload {
|
|||
update_world,
|
||||
sync_client_positions,
|
||||
on_client_disconnect,
|
||||
).into_workload()
|
||||
).into_workload(),
|
||||
save_modified
|
||||
.into_workload()
|
||||
.make_fixed(10000, 0),
|
||||
).into_sequential_workload()
|
||||
}
|
||||
|
||||
|
|
|
@ -24,12 +24,13 @@ use crate::{
|
|||
|
||||
pub mod chunk;
|
||||
pub mod tasks;
|
||||
pub mod save;
|
||||
|
||||
use chunk::Chunk;
|
||||
|
||||
use self::{
|
||||
tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager},
|
||||
chunk::ChunkState
|
||||
chunk::ChunkState,
|
||||
};
|
||||
|
||||
#[derive(Unique, Default)]
|
||||
|
@ -106,7 +107,7 @@ fn process_chunk_requests(
|
|||
chunk.state = ChunkState::Loading;
|
||||
chunk.subscriptions.insert(message.client_id);
|
||||
chunk_manager.chunks.insert(chunk_position, chunk);
|
||||
task_manager.spawn_task(ChunkTask::LoadChunk {
|
||||
task_manager.run(ChunkTask::LoadChunk {
|
||||
position: chunk_position,
|
||||
seed: config.world.seed,
|
||||
});
|
||||
|
@ -249,7 +250,11 @@ fn process_block_queue(
|
|||
let Some(blocks) = &mut chunk.blocks else {
|
||||
return true
|
||||
};
|
||||
blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize] = item.block_type;
|
||||
let block = &mut blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize];
|
||||
if item.block_type != *block {
|
||||
*block = item.block_type;
|
||||
chunk.data_modified = true;
|
||||
}
|
||||
false
|
||||
});
|
||||
if initial_len != queue.queue.len() {
|
||||
|
@ -278,7 +283,7 @@ pub fn preheat_world(
|
|||
let mut chunk = Chunk::new();
|
||||
chunk.state = ChunkState::Loading;
|
||||
chunk_manager.chunks.insert(chunk_position, chunk);
|
||||
task_manager.spawn_task(ChunkTask::LoadChunk {
|
||||
task_manager.run(ChunkTask::LoadChunk {
|
||||
position: chunk_position,
|
||||
seed: config.world.seed,
|
||||
});
|
||||
|
@ -292,7 +297,7 @@ pub fn init_world() -> Workload {
|
|||
init_chunk_manager_and_block_queue.before_all(preheat_world),
|
||||
init_chunk_task_manager.before_all(preheat_world),
|
||||
preheat_world,
|
||||
).into_workload()
|
||||
).into_sequential_workload()
|
||||
}
|
||||
|
||||
pub fn update_world() -> Workload {
|
||||
|
|
|
@ -16,13 +16,16 @@ pub struct Chunk {
|
|||
pub state: ChunkState,
|
||||
pub blocks: Option<BlockData>,
|
||||
pub subscriptions: HashSet<ClientId, BuildNoHashHasher<ClientId>>,
|
||||
pub data_modified: bool,
|
||||
}
|
||||
|
||||
impl Chunk {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
state: ChunkState::Nothing,
|
||||
blocks: None,
|
||||
subscriptions: HashSet::with_capacity_and_hasher(4, BuildNoHashHasher::default()),
|
||||
data_modified: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
43
kubi-server/src/world/save.rs
Normal file
43
kubi-server/src/world/save.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
use kubi_shared::data::{io_thread::IOThreadManager, open_local_save_file};
|
||||
use shipyard::{AllStoragesView, UniqueView, UniqueViewMut};
|
||||
use crate::config::ConfigTable;
|
||||
use super::{
|
||||
tasks::{ChunkTask, ChunkTaskManager},
|
||||
ChunkManager,
|
||||
};
|
||||
|
||||
pub fn init_save_file(storages: &AllStoragesView) -> Option<IOThreadManager> {
|
||||
let config = storages.borrow::<UniqueView<ConfigTable>>().unwrap();
|
||||
if let Some(file_path) = &config.world.file {
|
||||
log::info!("Initializing save file from {:?}", file_path);
|
||||
let save = open_local_save_file(&file_path).unwrap();
|
||||
Some(IOThreadManager::new(save))
|
||||
} else {
|
||||
log::warn!("No save file specified, world will not be saved");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save_modified(
|
||||
mut chunks: UniqueViewMut<ChunkManager>,
|
||||
ctm: UniqueView<ChunkTaskManager>,
|
||||
) {
|
||||
log::info!("Saving...");
|
||||
let mut amount_saved = 0;
|
||||
for (position, chunk) in chunks.chunks.iter_mut() {
|
||||
if chunk.data_modified {
|
||||
let Some(data) = chunk.blocks.clone() else {
|
||||
continue
|
||||
};
|
||||
ctm.run(ChunkTask::SaveChunk {
|
||||
position: *position,
|
||||
data: data,
|
||||
});
|
||||
chunk.data_modified = false;
|
||||
amount_saved += 1;
|
||||
}
|
||||
}
|
||||
if amount_saved > 0 {
|
||||
log::info!("Queued {} chunks for saving", amount_saved);
|
||||
}
|
||||
}
|
|
@ -4,16 +4,19 @@ use glam::IVec3;
|
|||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use anyhow::Result;
|
||||
use kubi_shared::{
|
||||
chunk::BlockData,
|
||||
worldgen::generate_world,
|
||||
queue::QueuedBlock,
|
||||
chunk::BlockData, data::io_thread::{IOCommand, IOResponse, IOThreadManager}, queue::QueuedBlock, worldgen::generate_world
|
||||
};
|
||||
use super::save::init_save_file;
|
||||
|
||||
pub enum ChunkTask {
|
||||
LoadChunk {
|
||||
position: IVec3,
|
||||
seed: u64,
|
||||
}
|
||||
},
|
||||
SaveChunk {
|
||||
position: IVec3,
|
||||
data: BlockData,
|
||||
},
|
||||
}
|
||||
|
||||
pub enum ChunkTaskResponse {
|
||||
|
@ -28,33 +31,74 @@ pub enum ChunkTaskResponse {
|
|||
pub struct ChunkTaskManager {
|
||||
channel: (Sender<ChunkTaskResponse>, Receiver<ChunkTaskResponse>),
|
||||
pool: ThreadPool,
|
||||
iota: Option<IOThreadManager>,
|
||||
}
|
||||
|
||||
impl ChunkTaskManager {
|
||||
pub fn new() -> Result<Self> {
|
||||
pub fn new(iota: Option<IOThreadManager>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
channel: unbounded(),
|
||||
pool: ThreadPoolBuilder::new().build()?
|
||||
pool: ThreadPoolBuilder::new().build()?,
|
||||
iota,
|
||||
})
|
||||
}
|
||||
pub fn spawn_task(&self, task: ChunkTask) {
|
||||
let sender = self.channel.0.clone();
|
||||
self.pool.spawn(move || {
|
||||
sender.send(match task {
|
||||
ChunkTask::LoadChunk { position: chunk_position, seed } => {
|
||||
//unwrap is fine because abort is not possible
|
||||
let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap();
|
||||
ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue }
|
||||
|
||||
pub fn run(&self, task: ChunkTask) {
|
||||
match task {
|
||||
ChunkTask::LoadChunk { position: chunk_position, seed } => {
|
||||
// 1. Check if the chunk exists in the save file
|
||||
if let ChunkTask::LoadChunk { position, .. } = &task {
|
||||
if let Some(iota) = &self.iota {
|
||||
if iota.chunk_exists(*position) {
|
||||
iota.send(IOCommand::LoadChunk { position: *position });
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}).unwrap()
|
||||
})
|
||||
|
||||
// 2. Generate the chunk if it doesn't exist
|
||||
let sender = self.channel.0.clone();
|
||||
self.pool.spawn(move || {
|
||||
sender.send({
|
||||
//unwrap is fine because abort is not possible
|
||||
let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap();
|
||||
ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue }
|
||||
}).unwrap()
|
||||
});
|
||||
},
|
||||
ChunkTask::SaveChunk { position, data } => {
|
||||
// Save the chunk to the save file
|
||||
if let Some(iota) = &self.iota {
|
||||
iota.send(IOCommand::SaveChunk { position, data });
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive(&self) -> Option<ChunkTaskResponse> {
|
||||
self.channel.1.try_recv().ok()
|
||||
// Try to receive IO results first
|
||||
// If there are none, try to receive worldgen results
|
||||
self.iota.as_ref().map(|iota| {
|
||||
iota.poll_single().map(|response| match response {
|
||||
IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded {
|
||||
chunk_position: position,
|
||||
blocks: data.expect("chunk data exists in the header, but was not loaded"),
|
||||
queue: Vec::with_capacity(0)
|
||||
},
|
||||
_ => panic!("Unexpected response from IO thread"),
|
||||
})
|
||||
}).flatten().or_else(|| {
|
||||
self.channel.1.try_recv().ok()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_chunk_task_manager(
|
||||
storages: AllStoragesView
|
||||
) {
|
||||
storages.add_unique(ChunkTaskManager::new().expect("ChunkTaskManager Init failed"));
|
||||
let iota = init_save_file(&storages);
|
||||
storages.add_unique(
|
||||
ChunkTaskManager::new(iota)
|
||||
.expect("ChunkTaskManager Init failed")
|
||||
);
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv
|
|||
serde_with = "3.4"
|
||||
bincode = "1.3"
|
||||
anyhow = "1.0"
|
||||
flume = "0.11"
|
||||
fastnoise-lite = { version = "1.1", features = ["std", "f64"] }
|
||||
rand = { version = "0.8", default_features = false, features = ["std", "min_const_gen"] }
|
||||
rand_xoshiro = "0.6"
|
||||
|
@ -23,6 +24,7 @@ bytemuck = { version = "1.14", features = ["derive"] }
|
|||
static_assertions = "1.1"
|
||||
nz = "0.4"
|
||||
atomic = "0.6"
|
||||
log = "0.4"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::{
|
||||
fs::File,
|
||||
mem::size_of,
|
||||
fs::{File, OpenOptions},
|
||||
io::{Read, Seek, SeekFrom, Write},
|
||||
path::Path,
|
||||
borrow::Cow,
|
||||
sync::{Arc, RwLock}
|
||||
};
|
||||
|
@ -17,6 +18,8 @@ use crate::{
|
|||
chunk::{CHUNK_SIZE, BlockDataRef, BlockData}
|
||||
};
|
||||
|
||||
pub mod io_thread;
|
||||
|
||||
const SECTOR_SIZE: usize = CHUNK_SIZE * CHUNK_SIZE * CHUNK_SIZE * size_of::<Block>();
|
||||
const RESERVED_SIZE: usize = 1048576; //~1mb (16 sectors assuming 32x32x32 world of 1byte blocks)
|
||||
const RESERVED_SECTOR_COUNT: usize = RESERVED_SIZE / SECTOR_SIZE;
|
||||
|
@ -47,19 +50,19 @@ impl Default for WorldSaveDataHeader {
|
|||
}
|
||||
}
|
||||
|
||||
pub type SharedHeader = Arc<RwLock<WorldSaveDataHeader>>;
|
||||
|
||||
#[derive(Unique)]
|
||||
pub struct WorldSaveFile {
|
||||
pub file: File,
|
||||
pub header: WorldSaveDataHeader,
|
||||
pub header: SharedHeader,
|
||||
}
|
||||
|
||||
pub type SharedSaveFile = Arc<RwLock<WorldSaveFile>>;
|
||||
|
||||
impl WorldSaveFile {
|
||||
pub fn new(file: File) -> Self {
|
||||
WorldSaveFile {
|
||||
file,
|
||||
header: WorldSaveDataHeader::default()
|
||||
header: Arc::new(RwLock::new(WorldSaveDataHeader::default())),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,7 +79,7 @@ impl WorldSaveFile {
|
|||
}
|
||||
|
||||
let limit = (RESERVED_SIZE - SUBHEADER_SIZE) as u64;
|
||||
self.header = bincode::deserialize_from((&self.file).take(limit))?;
|
||||
*self.header.write().unwrap() = bincode::deserialize_from((&self.file).take(limit))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -88,7 +91,7 @@ impl WorldSaveFile {
|
|||
//XXX: this can cause the header to destroy chunk data (if it's WAY too long)
|
||||
// read has checks against this, but write doesn't
|
||||
// 1mb is pretty generous tho, so it's not a *big* deal
|
||||
bincode::serialize_into(&self.file, &self.header)?;
|
||||
bincode::serialize_into(&self.file, &*self.header.read().unwrap())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -102,19 +105,28 @@ impl WorldSaveFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn allocate_sector(&mut self) -> u32 {
|
||||
let value = self.header.sector_count + 1;
|
||||
self.header.sector_count += 1;
|
||||
value
|
||||
}
|
||||
// fn allocate_sector(&mut self) -> u32 {
|
||||
// let mut lock = self.header.write().unwrap();
|
||||
// let value = lock.sector_count + 1;
|
||||
// lock.sector_count += 1;
|
||||
// value
|
||||
// }
|
||||
|
||||
pub fn save_chunk(&mut self, position: IVec3, data: &BlockDataRef) -> Result<()> {
|
||||
let mut header_lock = self.header.write().unwrap();
|
||||
|
||||
let mut header_modified = false;
|
||||
let sector = self.header.chunk_map.get(&position).copied().unwrap_or_else(|| {
|
||||
let sector = header_lock.chunk_map.get(&position).copied().unwrap_or_else(|| {
|
||||
header_modified = true;
|
||||
self.allocate_sector()
|
||||
let sector = header_lock.sector_count;
|
||||
header_lock.sector_count += 1;
|
||||
header_lock.chunk_map.insert(position, sector);
|
||||
sector
|
||||
// self.allocate_sector()
|
||||
});
|
||||
|
||||
drop(header_lock);
|
||||
|
||||
let offset = sector as u64 * SECTOR_SIZE as u64;
|
||||
|
||||
const_assert_eq!(size_of::<Block>(), 1);
|
||||
|
@ -136,11 +148,11 @@ impl WorldSaveFile {
|
|||
}
|
||||
|
||||
pub fn chunk_exists(&self, position: IVec3) -> bool {
|
||||
self.header.chunk_map.contains_key(&position)
|
||||
self.header.read().unwrap().chunk_map.contains_key(&position)
|
||||
}
|
||||
|
||||
pub fn load_chunk(&mut self, position: IVec3) -> Result<Option<BlockData>> {
|
||||
let Some(§or) = self.header.chunk_map.get(&position) else {
|
||||
let Some(§or) = self.header.read().unwrap().chunk_map.get(&position) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
|
@ -166,4 +178,26 @@ impl WorldSaveFile {
|
|||
|
||||
Ok(Some(data))
|
||||
}
|
||||
|
||||
pub fn get_shared_header(&self) -> SharedHeader {
|
||||
Arc::clone(&self.header)
|
||||
}
|
||||
}
|
||||
|
||||
/// Utility function to open a local save file, creating it if it doesn't exist
|
||||
pub fn open_local_save_file(path: &Path) -> Result<WorldSaveFile> {
|
||||
let mut save_file = WorldSaveFile::new({
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(path)?
|
||||
});
|
||||
if save_file.file.metadata().unwrap().len() == 0 {
|
||||
save_file.initialize()?;
|
||||
} else {
|
||||
save_file.load_data()?;
|
||||
}
|
||||
Ok(save_file)
|
||||
}
|
||||
|
||||
|
|
249
kubi-shared/src/data/io_thread.rs
Normal file
249
kubi-shared/src/data/io_thread.rs
Normal file
|
@ -0,0 +1,249 @@
|
|||
use glam::IVec3;
|
||||
use flume::{Receiver, Sender, TryIter};
|
||||
use shipyard::Unique;
|
||||
use crate::chunk::BlockData;
|
||||
use super::{SharedHeader, WorldSaveFile};
|
||||
|
||||
// Maximum amount of chunks to save in a single batch before checking if there are any pending read requests
|
||||
// may be broken, so currently disabled
|
||||
const MAX_SAVE_BATCH_SIZE: usize = usize::MAX;
|
||||
|
||||
pub enum IOCommand {
|
||||
SaveChunk {
|
||||
position: IVec3,
|
||||
data: BlockData,
|
||||
},
|
||||
|
||||
/// Load a chunk from the disk and send it to the main thread
|
||||
LoadChunk {
|
||||
position: IVec3,
|
||||
},
|
||||
|
||||
/// Process all pending write commands and make the thread end itself
|
||||
/// LoadChunk commands will be ignored after this command is received
|
||||
Kys,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum IOResponse {
|
||||
/// A chunk has been loaded from the disk
|
||||
/// Or not, in which case the data will be None
|
||||
/// and chunk should be generated
|
||||
ChunkLoaded {
|
||||
position: IVec3,
|
||||
data: Option<BlockData>,
|
||||
},
|
||||
|
||||
/// The IO thread has been terminated
|
||||
Terminated,
|
||||
}
|
||||
|
||||
struct IOThreadContext {
|
||||
tx: Sender<IOResponse>,
|
||||
rx: Receiver<IOCommand>,
|
||||
save: WorldSaveFile,
|
||||
save_queue: Vec<(IVec3, BlockData)>,
|
||||
}
|
||||
|
||||
//TODO: Implement proper error handling (I/O errors are rlly common)
|
||||
|
||||
impl IOThreadContext {
|
||||
/// Should be called ON the IO thread
|
||||
///
|
||||
/// Initializes the IO thread context, opening the file at the given path
|
||||
/// If there's an error opening the file, the thread will panic (TODO: handle this more gracefully)
|
||||
pub fn initialize(
|
||||
tx: Sender<IOResponse>,
|
||||
rx: Receiver<IOCommand>,
|
||||
save: WorldSaveFile,
|
||||
) -> Self {
|
||||
// save.load_data().unwrap();
|
||||
let save_queue = Vec::new();
|
||||
Self { tx, rx, save, save_queue }
|
||||
}
|
||||
|
||||
pub fn run(mut self) {
|
||||
loop {
|
||||
// because were waiting for the next command, we can't process the save_queue
|
||||
// which breaks batching, so we need to check if there are any pending save requests
|
||||
// and if there are, use non-blocking recv to give them a chance to be processed
|
||||
'rx: while let Some(command) = {
|
||||
if self.save_queue.len() > 0 {
|
||||
self.rx.try_recv().ok()
|
||||
} else {
|
||||
self.rx.recv().ok()
|
||||
}
|
||||
} {
|
||||
match command {
|
||||
IOCommand::SaveChunk { position, data } => {
|
||||
// if chunk already has a save request, overwrite it
|
||||
for (pos, old_data) in self.save_queue.iter_mut() {
|
||||
if *pos == position {
|
||||
*old_data = data;
|
||||
continue 'rx;
|
||||
}
|
||||
}
|
||||
// if not, save to the queue
|
||||
self.save_queue.push((position, data));
|
||||
//log::trace!("amt of unsaved chunks: {}", self.save_queue.len());
|
||||
}
|
||||
IOCommand::LoadChunk { position } => {
|
||||
// HOLD ON
|
||||
// first check if the chunk is already in the save queue
|
||||
// if it is, send it and continue
|
||||
// (NOT doing this WILL result in data loss if the user returns to the chunk too quickly)
|
||||
for (pos, data) in self.save_queue.iter() {
|
||||
if *pos == position {
|
||||
self.tx.send(IOResponse::ChunkLoaded { position, data: Some(data.clone()) }).unwrap();
|
||||
continue 'rx;
|
||||
}
|
||||
}
|
||||
let data = self.save.load_chunk(position).unwrap();
|
||||
self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap();
|
||||
}
|
||||
IOCommand::Kys => {
|
||||
// Process all pending write commands
|
||||
log::info!("info: queue has {} chunks", self.save_queue.len());
|
||||
let mut saved_amount = 0;
|
||||
for (pos, data) in self.save_queue.drain(..) {
|
||||
self.save.save_chunk(pos, &data).unwrap();
|
||||
saved_amount += 1;
|
||||
}
|
||||
log::debug!("now, moving on to the rx queue...");
|
||||
for cmd in self.rx.try_iter() {
|
||||
let IOCommand::SaveChunk { position, data } = cmd else {
|
||||
continue;
|
||||
};
|
||||
self.save.save_chunk(position, &data).unwrap();
|
||||
saved_amount += 1;
|
||||
}
|
||||
log::info!("saved {} chunks on exit", saved_amount);
|
||||
self.tx.send(IOResponse::Terminated).unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// between every betch of requests, check if there are any pending save requests
|
||||
if self.save_queue.len() > 0 {
|
||||
let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len());
|
||||
log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE);
|
||||
for (pos, data) in self.save_queue.drain(..will_drain) {
|
||||
self.save.save_chunk(pos, &data).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IOSingleThread {
|
||||
tx: Sender<IOCommand>,
|
||||
rx: Receiver<IOResponse>,
|
||||
handle: std::thread::JoinHandle<()>,
|
||||
header: SharedHeader,
|
||||
}
|
||||
|
||||
impl IOSingleThread {
|
||||
pub fn spawn(save: WorldSaveFile) -> Self {
|
||||
// Create channels
|
||||
let (command_tx, command_rx) = flume::unbounded();
|
||||
let (response_tx, response_rx) = flume::unbounded();
|
||||
|
||||
// Grab a handle to the header
|
||||
let header = save.get_shared_header();
|
||||
|
||||
// Spawn the thread
|
||||
let builder = std::thread::Builder::new()
|
||||
.name("io-thread".into());
|
||||
let handle = builder.spawn(move || {
|
||||
let context = IOThreadContext::initialize(response_tx, command_rx, save);
|
||||
context.run();
|
||||
}).unwrap();
|
||||
|
||||
IOSingleThread {
|
||||
tx: command_tx,
|
||||
rx: response_rx,
|
||||
handle,
|
||||
header,
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a command to the IO thread
|
||||
pub fn send(&self, cmd: IOCommand) {
|
||||
self.tx.send(cmd).unwrap();
|
||||
}
|
||||
|
||||
/// Poll the IO thread for a single response (non-blocking)
|
||||
pub fn poll_single(&self) -> Option<IOResponse> {
|
||||
self.rx.try_recv().ok()
|
||||
}
|
||||
|
||||
/// Poll the IO thread for responses (non-blocking)
|
||||
pub fn poll(&self) -> TryIter<IOResponse> {
|
||||
self.rx.try_iter()
|
||||
}
|
||||
|
||||
/// Signal the IO thread to process the remaining requests and wait for it to terminate
|
||||
pub fn stop_sync(&self) {
|
||||
log::debug!("Stopping IO thread (sync)");
|
||||
|
||||
// Tell the thread to terminate and wait for it to finish
|
||||
self.tx.send(IOCommand::Kys).unwrap();
|
||||
while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {}
|
||||
|
||||
// HACK "we have .join at home"
|
||||
while !self.handle.is_finished() {}
|
||||
|
||||
log::debug!("IO thread stopped"); //almost lol
|
||||
}
|
||||
|
||||
/// Same as stop_sync but doesn't wait for the IO thread to terminate
|
||||
pub fn stop_async(&self) {
|
||||
log::debug!("Stopping IO thread (async)");
|
||||
self.tx.send(IOCommand::Kys).unwrap();
|
||||
}
|
||||
|
||||
pub fn chunk_exists(&self, position: IVec3) -> bool {
|
||||
self.header.read().unwrap().chunk_map.contains_key(&position)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IOSingleThread {
|
||||
fn drop(&mut self) {
|
||||
log::trace!("IOSingleThread dropped, about to sync unsaved data...");
|
||||
self.stop_sync();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// This is a stub for future implemntation that may use multiple IO threads
|
||||
#[derive(Unique)]
|
||||
pub struct IOThreadManager {
|
||||
thread: IOSingleThread,
|
||||
}
|
||||
|
||||
impl IOThreadManager {
|
||||
pub fn new(save: WorldSaveFile) -> Self {
|
||||
Self {
|
||||
thread: IOSingleThread::spawn(save)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, cmd: IOCommand) {
|
||||
self.thread.send(cmd);
|
||||
}
|
||||
|
||||
pub fn poll_single(&self) -> Option<IOResponse> {
|
||||
self.thread.poll_single()
|
||||
}
|
||||
|
||||
pub fn poll(&self) -> TryIter<IOResponse> {
|
||||
self.thread.poll()
|
||||
}
|
||||
|
||||
pub fn chunk_exists(&self, position: IVec3) -> bool {
|
||||
self.thread.chunk_exists(position)
|
||||
}
|
||||
}
|
||||
|
||||
// i think im a girl :3 (noone will ever read this right? :p)
|
||||
|
|
@ -8,3 +8,4 @@ pub mod entity;
|
|||
pub mod player;
|
||||
pub mod queue;
|
||||
pub mod data;
|
||||
pub mod fixed_timestamp;
|
||||
|
|
|
@ -5,33 +5,25 @@ use crate::{
|
|||
networking::{GameType, ServerAddress},
|
||||
state::{GameState, NextState}
|
||||
};
|
||||
use kubi_shared::data::WorldSaveFile;
|
||||
|
||||
fn open_local_save_file(path: &Path) -> Result<WorldSaveFile> {
|
||||
let mut save_file = WorldSaveFile::new({
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open("world.kbi")?
|
||||
});
|
||||
if save_file.file.metadata().unwrap().len() == 0 {
|
||||
save_file.initialize()?;
|
||||
} else {
|
||||
save_file.load_data()?;
|
||||
}
|
||||
Ok(save_file)
|
||||
}
|
||||
use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile, open_local_save_file};
|
||||
|
||||
pub fn initialize_from_args(
|
||||
all_storages: AllStoragesView,
|
||||
) {
|
||||
// If an address is provided, we're in multiplayer mode (the first argument is the address)
|
||||
// Otherwise, we're in singleplayer mode and working with local stuff
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() > 1 {
|
||||
// Parse the address and switch the state to connecting
|
||||
let address = args[1].parse::<SocketAddr>().expect("invalid address");
|
||||
all_storages.add_unique(GameType::Muliplayer);
|
||||
all_storages.add_unique(ServerAddress(address));
|
||||
all_storages.borrow::<UniqueViewMut<NextState>>().unwrap().0 = Some(GameState::Connecting);
|
||||
} else {
|
||||
// Open the local save file
|
||||
let save_file = open_local_save_file(Path::new("./world.kubi")).expect("failed to open save file");
|
||||
all_storages.add_unique(IOThreadManager::new(save_file));
|
||||
// Switch the state and kick off the world loading
|
||||
all_storages.add_unique(GameType::Singleplayer);
|
||||
all_storages.borrow::<UniqueViewMut<NextState>>().unwrap().0 = Some(GameState::LoadingWorld);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,9 @@ use winit::{
|
|||
use glam::vec3;
|
||||
use std::time::Instant;
|
||||
|
||||
//TODO remove these re-exports
|
||||
pub(crate) use kubi_shared::transform;
|
||||
pub(crate) use kubi_shared::fixed_timestamp;
|
||||
|
||||
mod ui;
|
||||
pub(crate) use ui::{
|
||||
|
@ -51,17 +53,12 @@ pub(crate) mod hui_integration;
|
|||
pub(crate) mod networking;
|
||||
pub(crate) mod init;
|
||||
pub(crate) mod color;
|
||||
pub(crate) mod fixed_timestamp;
|
||||
pub(crate) mod filesystem;
|
||||
pub(crate) mod client_physics;
|
||||
pub(crate) mod chat;
|
||||
|
||||
use world::{
|
||||
init_game_world,
|
||||
loading::update_loaded_world_around_player,
|
||||
raycast::update_raycasts,
|
||||
queue::apply_queued_blocks,
|
||||
tasks::ChunkTaskManager,
|
||||
init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager
|
||||
};
|
||||
use player::{spawn_player, MainPlayer};
|
||||
use prefabs::load_prefabs;
|
||||
|
@ -157,7 +154,6 @@ fn update() -> Workload {
|
|||
kubi_ui_end,
|
||||
update_state,
|
||||
exit_on_esc,
|
||||
disconnect_on_exit.run_if(is_multiplayer),
|
||||
update_rendering_late,
|
||||
).into_sequential_workload()
|
||||
}
|
||||
|
@ -183,6 +179,13 @@ fn after_render() -> Workload {
|
|||
).into_sequential_workload()
|
||||
}
|
||||
|
||||
fn on_exit() -> Workload{
|
||||
(
|
||||
disconnect_on_exit.run_if(is_multiplayer),
|
||||
save_on_exit.run_if(is_singleplayer),
|
||||
).into_sequential_workload().run_if(is_ingame_or_loading)
|
||||
}
|
||||
|
||||
#[cfg(all(windows, not(debug_assertions)))]
|
||||
fn attach_console() {
|
||||
use winapi::um::wincon::{AttachConsole, ATTACH_PARENT_PROCESS};
|
||||
|
@ -243,6 +246,7 @@ pub fn kubi_main(
|
|||
world.add_workload(update);
|
||||
//world.add_workload(render);
|
||||
world.add_workload(after_render);
|
||||
world.add_workload(on_exit);
|
||||
|
||||
//Save _visualizer.json
|
||||
#[cfg(feature = "generate_visualizer_data")] {
|
||||
|
@ -350,6 +354,11 @@ pub fn kubi_main(
|
|||
window_target.exit();
|
||||
}
|
||||
},
|
||||
|
||||
Event::LoopExiting => {
|
||||
world.run_workload(on_exit).unwrap();
|
||||
},
|
||||
|
||||
_ => (),
|
||||
};
|
||||
}).unwrap();
|
||||
|
|
|
@ -159,19 +159,15 @@ pub fn update_networking_late() -> Workload {
|
|||
}
|
||||
|
||||
pub fn disconnect_on_exit(
|
||||
exit: UniqueView<RequestExit>,
|
||||
mut client: UniqueViewMut<UdpClient>,
|
||||
) {
|
||||
//TODO check if this works
|
||||
if exit.0 {
|
||||
if client.0.is_active() {
|
||||
client.0.flush();
|
||||
client.0.disconnect();
|
||||
while client.0.is_active() { client.0.step().for_each(|_|()); }
|
||||
log::info!("Client disconnected");
|
||||
} else {
|
||||
log::info!("Client inactive")
|
||||
}
|
||||
if client.0.is_active() {
|
||||
client.0.flush();
|
||||
client.0.disconnect();
|
||||
while client.0.is_active() { client.0.step().for_each(|_|()); }
|
||||
log::info!("Client disconnected");
|
||||
} else {
|
||||
log::info!("Client inactive")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ pub fn inject_network_responses_into_manager_queue(
|
|||
let ServerToClientMessage::ChunkResponse {
|
||||
chunk, data, queued
|
||||
} = packet else { unreachable!() };
|
||||
manager.add_sussy_response(ChunkTaskResponse::LoadedChunk {
|
||||
manager.add_sussy_response(ChunkTaskResponse::ChunkWorldgenDone {
|
||||
position: chunk,
|
||||
chunk_data: data,
|
||||
queued
|
||||
|
|
|
@ -62,10 +62,10 @@ impl ChunkStorage {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Unique)]
|
||||
pub struct WorldInfo {
|
||||
pub seed: u32,
|
||||
}
|
||||
// #[derive(Unique)]
|
||||
// pub struct WorldInfo {
|
||||
// pub seed: u32,
|
||||
// }
|
||||
|
||||
#[derive(Default, Unique)]
|
||||
pub struct ChunkMeshStorage {
|
||||
|
|
|
@ -57,6 +57,7 @@ pub struct Chunk {
|
|||
pub desired_state: DesiredChunkState,
|
||||
pub abortion: Option<Arc<Atomic<AbortState>>>,
|
||||
pub mesh_dirty: bool,
|
||||
pub data_modified: bool,
|
||||
}
|
||||
|
||||
impl Chunk {
|
||||
|
@ -69,6 +70,7 @@ impl Chunk {
|
|||
desired_state: Default::default(),
|
||||
abortion: None,
|
||||
mesh_dirty: false,
|
||||
data_modified: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
use std::sync::Arc;
|
||||
use atomic::{Atomic, Ordering};
|
||||
use glam::{IVec3, ivec3};
|
||||
use kubi_shared::{networking::{channels::Channel, messages::ClientToServerMessage}, worldgen::AbortState};
|
||||
use kubi_shared::{
|
||||
data::io_thread::{IOCommand, IOResponse, IOThreadManager},
|
||||
networking::{channels::Channel, messages::ClientToServerMessage},
|
||||
worldgen::AbortState,
|
||||
};
|
||||
use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track};
|
||||
use uflow::SendMode;
|
||||
use wgpu::util::DeviceExt;
|
||||
use crate::{
|
||||
networking::UdpClient,
|
||||
player::MainPlayer,
|
||||
rendering::{world::ChunkVertex, BufferPair, Renderer},
|
||||
rendering::{BufferPair, Renderer},
|
||||
settings::GameSettings,
|
||||
state::GameState,
|
||||
transform::Transform,
|
||||
|
@ -16,10 +20,12 @@ use crate::{
|
|||
use super::{
|
||||
ChunkStorage, ChunkMeshStorage,
|
||||
chunk::{Chunk, DesiredChunkState, CHUNK_SIZE, ChunkMesh, CurrentChunkState, ChunkData},
|
||||
tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask},
|
||||
tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask},
|
||||
queue::BlockUpdateQueue,
|
||||
};
|
||||
|
||||
const WORLD_SEED: u64 = 0xfeb_face_dead_cafe;
|
||||
|
||||
const MAX_CHUNK_OPS_INGAME: usize = 8;
|
||||
const MAX_CHUNK_OPS: usize = 32;
|
||||
|
||||
|
@ -92,6 +98,7 @@ pub fn update_chunks_if_player_moved(
|
|||
|
||||
fn process_state_changes(
|
||||
task_manager: UniqueView<ChunkTaskManager>,
|
||||
io: Option<UniqueView<IOThreadManager>>,
|
||||
mut udp_client: Option<UniqueViewMut<UdpClient>>,
|
||||
mut world: UniqueViewMut<ChunkStorage>,
|
||||
mut vm_meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>,
|
||||
|
@ -135,7 +142,7 @@ fn process_state_changes(
|
|||
chunk.current_state,
|
||||
CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh,
|
||||
) => {
|
||||
chunk.block_data = None;
|
||||
// chunk.block_data = None; //HACK when downgrading, keep the data so we can save it
|
||||
chunk.current_state = CurrentChunkState::Nothing;
|
||||
},
|
||||
|
||||
|
@ -184,18 +191,38 @@ fn process_state_changes(
|
|||
SendMode::Reliable
|
||||
);
|
||||
} else {
|
||||
let atomic = Arc::new(Atomic::new(AbortState::Continue));
|
||||
task_manager.spawn_task(ChunkTask::LoadChunk {
|
||||
seed: 0xbeef_face_dead_cafe,
|
||||
position,
|
||||
abortion: Some(Arc::clone(&atomic)),
|
||||
});
|
||||
abortion = Some(atomic);
|
||||
|
||||
// If the chunk exists in the save file (and save file is there in the first place),
|
||||
// ... we'll try to load it
|
||||
// Otherwise, we'll run worldgen
|
||||
|
||||
let mut should_run_worldgen = true;
|
||||
|
||||
if let Some(io) = &io {
|
||||
if io.chunk_exists(position) {
|
||||
// Try to load the chunk from the save file
|
||||
// In case that fails, we will run worldgen once the IO thread responds
|
||||
io.send(IOCommand::LoadChunk { position });
|
||||
should_run_worldgen = false;
|
||||
}
|
||||
}
|
||||
|
||||
if should_run_worldgen {
|
||||
let atomic = Arc::new(Atomic::new(AbortState::Continue));
|
||||
task_manager.spawn_task(ChunkTask::ChunkWorldgen {
|
||||
seed: WORLD_SEED,
|
||||
position,
|
||||
abortion: Some(Arc::clone(&atomic)),
|
||||
});
|
||||
abortion = Some(atomic);
|
||||
}
|
||||
}
|
||||
|
||||
//Update chunk state
|
||||
let chunk = world.chunks.get_mut(&position).unwrap();
|
||||
chunk.current_state = CurrentChunkState::Loading;
|
||||
chunk.abortion = abortion;
|
||||
|
||||
// ===========
|
||||
//log::trace!("Started loading chunk {position}");
|
||||
},
|
||||
|
@ -254,7 +281,29 @@ fn process_state_changes(
|
|||
return false
|
||||
}
|
||||
|
||||
//HACK, since save files are not implemented, just unload immediately
|
||||
// If in singleplayer and have an open save file, we need to save the chunk to the disk
|
||||
|
||||
// ==========================================================
|
||||
//TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED
|
||||
// OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY
|
||||
// ==========================================================
|
||||
//XXX: CHECK IF WE REALLY NEED THIS OR IF WE CAN JUST KILL THE CHUNK RIGHT AWAY
|
||||
//CHANGES TO CHUNK SAVING LOGIC SHOULD HAVE MADE THE ABOVE COMMENT OBSOLETE
|
||||
|
||||
if let Some(io) = &io {
|
||||
if let Some(block_data) = &chunk.block_data {
|
||||
// Only save the chunk if it has been modified
|
||||
if chunk.data_modified {
|
||||
// log::debug!("issue save command");
|
||||
chunk.data_modified = false;
|
||||
io.send(IOCommand::SaveChunk {
|
||||
position,
|
||||
data: block_data.blocks.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
true
|
||||
|
@ -264,6 +313,7 @@ fn process_state_changes(
|
|||
|
||||
fn process_completed_tasks(
|
||||
task_manager: UniqueView<ChunkTaskManager>,
|
||||
io: Option<UniqueView<IOThreadManager>>,
|
||||
mut world: UniqueViewMut<ChunkStorage>,
|
||||
mut meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>,
|
||||
renderer: UniqueView<Renderer>,
|
||||
|
@ -271,9 +321,69 @@ fn process_completed_tasks(
|
|||
mut queue: UniqueViewMut<BlockUpdateQueue>,
|
||||
) {
|
||||
let mut ops: usize = 0;
|
||||
while let Some(res) = task_manager.receive() {
|
||||
|
||||
//TODO reduce code duplication between loaded/generated chunks
|
||||
|
||||
// Process IO first
|
||||
if let Some(io) = &io {
|
||||
for response in io.poll() {
|
||||
let IOResponse::ChunkLoaded { position, data } = response else {
|
||||
//TODO this is bad
|
||||
panic!("Unexpected IO response: {:?}", response);
|
||||
};
|
||||
|
||||
//check if chunk exists
|
||||
let Some(chunk) = world.chunks.get_mut(&position) else {
|
||||
log::warn!("LOADED blocks data discarded: chunk doesn't exist");
|
||||
continue
|
||||
};
|
||||
|
||||
//we cannot have abortion here but just in case, reset it
|
||||
chunk.abortion = None;
|
||||
|
||||
//check if chunk still wants it
|
||||
if !matches!(chunk.desired_state, DesiredChunkState::Loaded | DesiredChunkState::Rendered) {
|
||||
log::warn!("LOADED block data discarded: state undesirable: {:?}", chunk.desired_state);
|
||||
continue
|
||||
}
|
||||
|
||||
// check if we actually got the data
|
||||
if let Some(data) = data {
|
||||
// If we did get the data, yay :3
|
||||
chunk.block_data = Some(ChunkData {
|
||||
blocks: data
|
||||
});
|
||||
chunk.current_state = CurrentChunkState::Loaded;
|
||||
} else {
|
||||
// If we didn't get the data, we need to run worldgen
|
||||
// XXX: will this ever happen? we should always have the data in the save file
|
||||
let atomic = Arc::new(Atomic::new(AbortState::Continue));
|
||||
task_manager.spawn_task(ChunkTask::ChunkWorldgen {
|
||||
seed: WORLD_SEED,
|
||||
position,
|
||||
abortion: Some(Arc::clone(&atomic)),
|
||||
});
|
||||
chunk.abortion = Some(atomic);
|
||||
}
|
||||
|
||||
ops += 1;
|
||||
}
|
||||
|
||||
//return early if we've reached the limit
|
||||
if ops >= match *state {
|
||||
GameState::InGame => MAX_CHUNK_OPS_INGAME,
|
||||
_ => MAX_CHUNK_OPS,
|
||||
} { return }
|
||||
// XXX: this will completely skip polling the task manager if we've reached the limit
|
||||
// this is probably fine, but it might be worth revisiting later
|
||||
}
|
||||
|
||||
for res in task_manager.poll() {
|
||||
match res {
|
||||
ChunkTaskResponse::LoadedChunk { position, chunk_data, mut queued } => {
|
||||
ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => {
|
||||
//TODO this can fuck shit up really badly if io op gets overwritten by worldgen chunk
|
||||
//TODO only accept if loading stage, not loaded
|
||||
|
||||
//If unwanted chunk is already loaded
|
||||
//It would be ~~...unethical~~ impossible to abort the operation at this point
|
||||
//Instead, we'll just throw it away
|
||||
|
@ -308,7 +418,7 @@ fn process_completed_tasks(
|
|||
//increase ops counter
|
||||
ops += 1;
|
||||
},
|
||||
ChunkTaskResponse::GeneratedMesh {
|
||||
ChunkTaskResponse::GenerateMeshDone {
|
||||
position,
|
||||
vertices, indices,
|
||||
trans_vertices, trans_indices,
|
||||
|
@ -392,3 +502,20 @@ fn process_completed_tasks(
|
|||
} { break }
|
||||
}
|
||||
}
|
||||
|
||||
/// Save all modified chunks to the disk
|
||||
pub fn save_on_exit(
|
||||
io: UniqueView<IOThreadManager>,
|
||||
world: UniqueView<ChunkStorage>,
|
||||
) {
|
||||
for (&position, chunk) in &world.chunks {
|
||||
if let Some(block_data) = &chunk.block_data {
|
||||
if chunk.data_modified {
|
||||
io.send(IOCommand::SaveChunk {
|
||||
position,
|
||||
data: block_data.blocks.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,15 @@ pub fn apply_queued_blocks(
|
|||
if event.soft && *block != Block::Air {
|
||||
return false
|
||||
}
|
||||
if event.block_type == *block {
|
||||
return false
|
||||
}
|
||||
*block = event.block_type;
|
||||
//mark chunk as dirty
|
||||
let (chunk_pos, block_pos) = ChunkStorage::to_chunk_coords(event.position);
|
||||
let chunk = world.chunks.get_mut(&chunk_pos).expect("This error should never happen, if it does then something is super fucked up and the whole project needs to be burnt down.");
|
||||
chunk.mesh_dirty = true;
|
||||
chunk.data_modified = true;
|
||||
//If block pos is close to the border, some neighbors may be dirty!
|
||||
const DIRECTIONS: [IVec3; 6] = [
|
||||
ivec3(1, 0, 0),
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::sync::Arc;
|
||||
use atomic::Atomic;
|
||||
use flume::{Sender, Receiver};
|
||||
use flume::{Receiver, Sender, TryIter};
|
||||
use glam::IVec3;
|
||||
use kubi_shared::{queue::QueuedBlock, worldgen::AbortState};
|
||||
use shipyard::Unique;
|
||||
|
@ -13,7 +13,7 @@ use super::{
|
|||
use crate::rendering::world::ChunkVertex;
|
||||
|
||||
pub enum ChunkTask {
|
||||
LoadChunk {
|
||||
ChunkWorldgen {
|
||||
seed: u64,
|
||||
position: IVec3,
|
||||
abortion: Option<Arc<Atomic<AbortState>>>,
|
||||
|
@ -23,13 +23,14 @@ pub enum ChunkTask {
|
|||
data: MeshGenData
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ChunkTaskResponse {
|
||||
LoadedChunk {
|
||||
ChunkWorldgenDone {
|
||||
position: IVec3,
|
||||
chunk_data: BlockData,
|
||||
queued: Vec<QueuedBlock>
|
||||
},
|
||||
GeneratedMesh {
|
||||
GenerateMeshDone {
|
||||
position: IVec3,
|
||||
vertices: Vec<ChunkVertex>,
|
||||
indices: Vec<u32>,
|
||||
|
@ -43,6 +44,7 @@ pub struct ChunkTaskManager {
|
|||
channel: (Sender<ChunkTaskResponse>, Receiver<ChunkTaskResponse>),
|
||||
pool: ThreadPool,
|
||||
}
|
||||
|
||||
impl ChunkTaskManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
|
@ -50,11 +52,17 @@ impl ChunkTaskManager {
|
|||
pool: ThreadPoolBuilder::new().num_threads(4).build().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
//TODO get rid of add_sussy_response
|
||||
|
||||
/// Add a response to the queue, to be picked up by the main thread
|
||||
/// Used by the multiplayer netcode, a huge hack
|
||||
pub fn add_sussy_response(&self, response: ChunkTaskResponse) {
|
||||
// this WILL get stuck if the channel is bounded
|
||||
// don't make the channel bounded ever
|
||||
self.channel.0.send(response).unwrap()
|
||||
}
|
||||
|
||||
pub fn spawn_task(&self, task: ChunkTask) {
|
||||
let sender = self.channel.0.clone();
|
||||
self.pool.spawn(move || {
|
||||
|
@ -64,23 +72,29 @@ impl ChunkTaskManager {
|
|||
(vertices, indices),
|
||||
(trans_vertices, trans_indices),
|
||||
) = generate_mesh(position, data);
|
||||
ChunkTaskResponse::GeneratedMesh {
|
||||
ChunkTaskResponse::GenerateMeshDone {
|
||||
position,
|
||||
vertices, indices,
|
||||
trans_vertices, trans_indices,
|
||||
}
|
||||
},
|
||||
ChunkTask::LoadChunk { position, seed, abortion } => {
|
||||
ChunkTask::ChunkWorldgen { position, seed, abortion } => {
|
||||
let Some((chunk_data, queued)) = generate_world(position, seed, abortion) else {
|
||||
log::warn!("aborted operation");
|
||||
return
|
||||
};
|
||||
ChunkTaskResponse::LoadedChunk { position, chunk_data, queued }
|
||||
ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, queued }
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[deprecated(note = "use poll instead")]
|
||||
pub fn receive(&self) -> Option<ChunkTaskResponse> {
|
||||
self.channel.1.try_recv().ok()
|
||||
}
|
||||
|
||||
pub fn poll(&self) -> TryIter<ChunkTaskResponse> {
|
||||
self.channel.1.try_iter()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue