diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 2386d40..931e531 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -21,7 +21,7 @@ pub enum IOCommand { Kys, } - +#[derive(Debug)] pub enum IOResponse { /// A chunk has been loaded from the disk /// Or not, in which case the data will be None @@ -69,7 +69,10 @@ impl IOThreadContext { } IOCommand::Kys => { // Process all pending write commands - while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() { + for cmd in self.rx.try_iter() { + let IOCommand::SaveChunk { position, data } = cmd else { + continue; + }; self.save.save_chunk(position, &data).unwrap(); } self.tx.send(IOResponse::Terminated).unwrap(); @@ -119,22 +122,28 @@ impl IOSingleThread { /// Signal the IO thread to process the remaining requests and wait for it to terminate pub fn stop_sync(&self) { + log::debug!("Stopping IO thread (sync)"); + // Tell the thread to terminate and wait for it to finish self.tx.send(IOCommand::Kys).unwrap(); while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} // HACK "we have .join at home" while !self.handle.is_finished() {} + + log::debug!("IO thread stopped"); //almost lol } /// Same as stop_sync but doesn't wait for the IO thread to terminate pub fn stop_async(&self) { + log::debug!("Stopping IO thread (async)"); self.tx.send(IOCommand::Kys).unwrap(); } } impl Drop for IOSingleThread { fn drop(&mut self) { + log::trace!("IOSingleThread dropped, about to sync unsaved data..."); self.stop_sync(); } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 9880730..4326e26 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -1,7 +1,11 @@ use std::sync::Arc; use atomic::{Atomic, Ordering}; use glam::{IVec3, ivec3}; -use kubi_shared::{networking::{channels::Channel, messages::ClientToServerMessage}, worldgen::AbortState}; +use kubi_shared::{ + data::io_thread::{IOCommand, IOResponse, IOThreadManager}, + networking::{channels::Channel, messages::ClientToServerMessage}, + worldgen::AbortState, +}; use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track}; use uflow::SendMode; use wgpu::util::DeviceExt; @@ -20,6 +24,8 @@ use super::{ queue::BlockUpdateQueue, }; +const WORLD_SEED: u64 = 0xbeef_face_dead_cafe; + const MAX_CHUNK_OPS_INGAME: usize = 8; const MAX_CHUNK_OPS: usize = 32; @@ -92,6 +98,7 @@ pub fn update_chunks_if_player_moved( fn process_state_changes( task_manager: UniqueView, + io: Option>, mut udp_client: Option>, mut world: UniqueViewMut, mut vm_meshes: NonSendSync>, @@ -135,7 +142,7 @@ fn process_state_changes( chunk.current_state, CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh, ) => { - chunk.block_data = None; + // chunk.block_data = None; //HACK when downgrading, keep the data so we can save it chunk.current_state = CurrentChunkState::Nothing; }, @@ -184,18 +191,33 @@ fn process_state_changes( SendMode::Reliable ); } else { - let atomic = Arc::new(Atomic::new(AbortState::Continue)); - task_manager.spawn_task(ChunkTask::ChunkWorldgen { - seed: 0xbeef_face_dead_cafe, - position, - abortion: Some(Arc::clone(&atomic)), - }); - abortion = Some(atomic); + + // ============================================================ + // TODO IMPORTANT: DO NOT WAIT FOR THE IO THREAD TO RESPOND, THIS WILL CAUSE A TERRIBLE BOTTLENECK + // find a way to check the world save header from the main thread instead! + // ============================================================ + + if let Some(io) = &io { + // Try to load the chunk from the save file + // In case that fails, we will run worldgen once the IO thread responds + io.send(IOCommand::LoadChunk { position }); + } else { + // If there's no IO thread, we'll just run worldgen right away + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + abortion = Some(atomic); + } } + //Update chunk state let chunk = world.chunks.get_mut(&position).unwrap(); chunk.current_state = CurrentChunkState::Loading; chunk.abortion = abortion; + // =========== //log::trace!("Started loading chunk {position}"); }, @@ -254,7 +276,22 @@ fn process_state_changes( return false } - //HACK, since save files are not implemented, just unload immediately + // If in singleplayer and have an open save file, we need to save the chunk to the disk + + // ========================================================== + //TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED + // OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY + // ========================================================== + if let Some(io) = &io { + if let Some(block_data) = &chunk.block_data { + // log::debug!("issue save command"); + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + return false } true @@ -264,6 +301,7 @@ fn process_state_changes( fn process_completed_tasks( task_manager: UniqueView, + io: Option>, mut world: UniqueViewMut, mut meshes: NonSendSync>, renderer: UniqueView, @@ -271,9 +309,68 @@ fn process_completed_tasks( mut queue: UniqueViewMut, ) { let mut ops: usize = 0; - while let Some(res) = task_manager.receive() { + + //TODO reduce code duplication between loaded/generated chunks + + // Process IO first + if let Some(io) = &io { + for response in io.poll() { + let IOResponse::ChunkLoaded { position, data } = response else { + //TODO this is bad + panic!("Unexpected IO response: {:?}", response); + }; + + //check if chunk exists + let Some(chunk) = world.chunks.get_mut(&position) else { + log::warn!("LOADED blocks data discarded: chunk doesn't exist"); + continue + }; + + //we cannot have abortion here but just in case, reset it + chunk.abortion = None; + + //check if chunk still wants it + if !matches!(chunk.desired_state, DesiredChunkState::Loaded | DesiredChunkState::Rendered) { + log::warn!("LOADED block data discarded: state undesirable: {:?}", chunk.desired_state); + continue + } + + // check if we actually got the data + if let Some(data) = data { + // If we did get the data, yay :3 + chunk.block_data = Some(ChunkData { + blocks: data + }); + chunk.current_state = CurrentChunkState::Loaded; + ops += 1; + } else { + // If we didn't get the data, we need to run worldgen + //TODO: this is a terrible bottleneck + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + chunk.abortion = Some(atomic); + } + } + + //return early if we've reached the limit + if ops >= match *state { + GameState::InGame => MAX_CHUNK_OPS_INGAME, + _ => MAX_CHUNK_OPS, + } { return } + // XXX: this will completely skip polling the task manager if we've reached the limit + // this is probably fine, but it might be worth revisiting later + } + + for res in task_manager.poll() { match res { ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { + //TODO this can fuck shit up really badly if io op gets overwritten by worldgen chunk + //TODO only accept if loading stage, not loaded + //If unwanted chunk is already loaded //It would be ~~...unethical~~ impossible to abort the operation at this point //Instead, we'll just throw it away diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index 90e1de8..eebc663 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -1,6 +1,6 @@ use std::sync::Arc; use atomic::Atomic; -use flume::{Sender, Receiver}; +use flume::{Receiver, Sender, TryIter}; use glam::IVec3; use kubi_shared::{queue::QueuedBlock, worldgen::AbortState}; use shipyard::Unique; @@ -89,7 +89,12 @@ impl ChunkTaskManager { }); } + #[deprecated(note = "use poll instead")] pub fn receive(&self) -> Option { self.channel.1.try_recv().ok() } + + pub fn poll(&self) -> TryIter { + self.channel.1.try_iter() + } }