From 37e68912ebe213ab80a8d9d0bd9770e36c593e64 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Mon, 2 Sep 2024 02:01:02 +0200 Subject: [PATCH] make it work pretty well i guess --- kubi-shared/src/data.rs | 46 +++++++++++++++++++------------ kubi-shared/src/data/io_thread.rs | 17 ++++++++++-- kubi/src/lib.rs | 20 ++++++++++---- kubi/src/networking.rs | 18 +++++------- kubi/src/world/loading.rs | 45 ++++++++++++++++++++++-------- 5 files changed, 98 insertions(+), 48 deletions(-) diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 3b66f8d..8c0ae41 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -49,19 +49,19 @@ impl Default for WorldSaveDataHeader { } } +pub type SharedHeader = Arc>; + #[derive(Unique)] pub struct WorldSaveFile { pub file: File, - pub header: WorldSaveDataHeader, + pub header: SharedHeader, } -pub type SharedSaveFile = Arc>; - impl WorldSaveFile { pub fn new(file: File) -> Self { WorldSaveFile { file, - header: WorldSaveDataHeader::default() + header: Arc::new(RwLock::new(WorldSaveDataHeader::default())), } } @@ -78,7 +78,7 @@ impl WorldSaveFile { } let limit = (RESERVED_SIZE - SUBHEADER_SIZE) as u64; - self.header = bincode::deserialize_from((&self.file).take(limit))?; + *self.header.write().unwrap() = bincode::deserialize_from((&self.file).take(limit))?; Ok(()) } @@ -90,7 +90,7 @@ impl WorldSaveFile { //XXX: this can cause the header to destroy chunk data (if it's WAY too long) // read has checks against this, but write doesn't // 1mb is pretty generous tho, so it's not a *big* deal - bincode::serialize_into(&self.file, &self.header)?; + bincode::serialize_into(&self.file, &*self.header.read().unwrap())?; Ok(()) } @@ -104,21 +104,27 @@ impl WorldSaveFile { Ok(()) } - fn allocate_sector(&mut self) -> u32 { - let value = self.header.sector_count + 1; - self.header.sector_count += 1; - value - } + // fn allocate_sector(&mut self) -> u32 { + // let mut lock = self.header.write().unwrap(); + // let value = lock.sector_count + 1; + // lock.sector_count += 1; + // value + // } pub fn save_chunk(&mut self, position: IVec3, data: &BlockDataRef) -> Result<()> { + let mut header_lock = self.header.write().unwrap(); + let mut header_modified = false; - let sector = self.header.chunk_map.get(&position).copied().unwrap_or_else(|| { + let sector = header_lock.chunk_map.get(&position).copied().unwrap_or_else(|| { header_modified = true; - self.allocate_sector() + let sector = header_lock.sector_count; + header_lock.sector_count += 1; + header_lock.chunk_map.insert(position, sector); + sector + // self.allocate_sector() }); - if header_modified { - self.header.chunk_map.insert(position, sector); - } + + drop(header_lock); let offset = sector as u64 * SECTOR_SIZE as u64; @@ -141,11 +147,11 @@ impl WorldSaveFile { } pub fn chunk_exists(&self, position: IVec3) -> bool { - self.header.chunk_map.contains_key(&position) + self.header.read().unwrap().chunk_map.contains_key(&position) } pub fn load_chunk(&mut self, position: IVec3) -> Result> { - let Some(§or) = self.header.chunk_map.get(&position) else { + let Some(§or) = self.header.read().unwrap().chunk_map.get(&position) else { return Ok(None); }; @@ -171,4 +177,8 @@ impl WorldSaveFile { Ok(Some(data)) } + + pub fn get_shared_header(&self) -> SharedHeader { + Arc::clone(&self.header) + } } diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 72b3da1..5991d40 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -2,7 +2,7 @@ use glam::IVec3; use flume::{Receiver, Sender, TryIter}; use shipyard::Unique; use crate::chunk::BlockData; -use super::WorldSaveFile; +use super::{SharedHeader, WorldSaveFile}; // Maximum amount of chunks to save in a single batch before checking if there are any pending read requests // may be broken, so currently disabled @@ -139,6 +139,7 @@ pub struct IOSingleThread { tx: Sender, rx: Receiver, handle: std::thread::JoinHandle<()>, + header: SharedHeader, } impl IOSingleThread { @@ -147,6 +148,9 @@ impl IOSingleThread { let (command_tx, command_rx) = flume::unbounded(); let (response_tx, response_rx) = flume::unbounded(); + // Grab a handle to the header + let header = save.get_shared_header(); + // Spawn the thread let builder = std::thread::Builder::new() .name("io-thread".into()); @@ -158,7 +162,8 @@ impl IOSingleThread { IOSingleThread { tx: command_tx, rx: response_rx, - handle + handle, + header, } } @@ -191,6 +196,10 @@ impl IOSingleThread { log::debug!("Stopping IO thread (async)"); self.tx.send(IOCommand::Kys).unwrap(); } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.header.read().unwrap().chunk_map.contains_key(&position) + } } impl Drop for IOSingleThread { @@ -221,6 +230,10 @@ impl IOThreadManager { pub fn poll(&self) -> TryIter { self.thread.poll() } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.thread.chunk_exists(position) + } } // i think im a girl :3 (noone will ever read this right? :p) diff --git a/kubi/src/lib.rs b/kubi/src/lib.rs index 1bcd7cc..2a9ab45 100644 --- a/kubi/src/lib.rs +++ b/kubi/src/lib.rs @@ -57,11 +57,7 @@ pub(crate) mod client_physics; pub(crate) mod chat; use world::{ - init_game_world, - loading::update_loaded_world_around_player, - raycast::update_raycasts, - queue::apply_queued_blocks, - tasks::ChunkTaskManager, + init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager }; use player::{spawn_player, MainPlayer}; use prefabs::load_prefabs; @@ -157,7 +153,6 @@ fn update() -> Workload { kubi_ui_end, update_state, exit_on_esc, - disconnect_on_exit.run_if(is_multiplayer), update_rendering_late, ).into_sequential_workload() } @@ -183,6 +178,13 @@ fn after_render() -> Workload { ).into_sequential_workload() } +fn on_exit() -> Workload{ + ( + disconnect_on_exit.run_if(is_multiplayer), + save_on_exit.run_if(is_singleplayer), + ).into_sequential_workload().run_if(is_ingame_or_loading) +} + #[cfg(all(windows, not(debug_assertions)))] fn attach_console() { use winapi::um::wincon::{AttachConsole, ATTACH_PARENT_PROCESS}; @@ -243,6 +245,7 @@ pub fn kubi_main( world.add_workload(update); //world.add_workload(render); world.add_workload(after_render); + world.add_workload(on_exit); //Save _visualizer.json #[cfg(feature = "generate_visualizer_data")] { @@ -350,6 +353,11 @@ pub fn kubi_main( window_target.exit(); } }, + + Event::LoopExiting => { + world.run_workload(on_exit).unwrap(); + }, + _ => (), }; }).unwrap(); diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 56f5fde..e9e5b5e 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -159,19 +159,15 @@ pub fn update_networking_late() -> Workload { } pub fn disconnect_on_exit( - exit: UniqueView, mut client: UniqueViewMut, ) { - //TODO check if this works - if exit.0 { - if client.0.is_active() { - client.0.flush(); - client.0.disconnect(); - while client.0.is_active() { client.0.step().for_each(|_|()); } - log::info!("Client disconnected"); - } else { - log::info!("Client inactive") - } + if client.0.is_active() { + client.0.flush(); + client.0.disconnect(); + while client.0.is_active() { client.0.step().for_each(|_|()); } + log::info!("Client disconnected"); + } else { + log::info!("Client inactive") } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index d1cbea4..c06afbf 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -192,17 +192,22 @@ fn process_state_changes( ); } else { - // ============================================================ - // TODO IMPORTANT: DO NOT WAIT FOR THE IO THREAD TO RESPOND, THIS WILL CAUSE A TERRIBLE BOTTLENECK - // find a way to check the world save header from the main thread instead! - // ============================================================ + // If the chunk exists in the save file (and save file is there in the first place), + // ... we'll try to load it + // Otherwise, we'll run worldgen + + let mut should_run_worldgen = true; if let Some(io) = &io { - // Try to load the chunk from the save file - // In case that fails, we will run worldgen once the IO thread responds - io.send(IOCommand::LoadChunk { position }); - } else { - // If there's no IO thread, we'll just run worldgen right away + if io.chunk_exists(position) { + // Try to load the chunk from the save file + // In case that fails, we will run worldgen once the IO thread responds + io.send(IOCommand::LoadChunk { position }); + should_run_worldgen = false; + } + } + + if should_run_worldgen { let atomic = Arc::new(Atomic::new(AbortState::Continue)); task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: WORLD_SEED, @@ -348,10 +353,9 @@ fn process_completed_tasks( blocks: data }); chunk.current_state = CurrentChunkState::Loaded; - ops += 1; } else { // If we didn't get the data, we need to run worldgen - //TODO: this is a terrible bottleneck + // XXX: will this ever happen? we should always have the data in the save file let atomic = Arc::new(Atomic::new(AbortState::Continue)); task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: WORLD_SEED, @@ -360,6 +364,8 @@ fn process_completed_tasks( }); chunk.abortion = Some(atomic); } + + ops += 1; } //return early if we've reached the limit @@ -495,3 +501,20 @@ fn process_completed_tasks( } { break } } } + +/// Save all modified chunks to the disk +pub fn save_on_exit( + io: UniqueView, + world: UniqueView, +) { + for (&position, chunk) in &world.chunks { + if let Some(block_data) = &chunk.block_data { + if chunk.data_modified { + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + } +}