From 2466c0293751316886f2a8c6b9a911319294a8f9 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Sun, 1 Sep 2024 23:37:49 +0200 Subject: [PATCH 1/8] initial impl --- .gitignore | 5 +- Cargo.lock | 2 + kubi-shared/Cargo.toml | 2 + kubi-shared/src/data.rs | 2 + kubi-shared/src/data/io_thread.rs | 166 ++++++++++++++++++++++++++++++ kubi/src/init.rs | 12 ++- kubi/src/networking/world.rs | 2 +- kubi/src/world/loading.rs | 10 +- kubi/src/world/tasks.rs | 21 ++-- 9 files changed, 207 insertions(+), 15 deletions(-) create mode 100644 kubi-shared/src/data/io_thread.rs diff --git a/.gitignore b/.gitignore index ef8c4d1..42f57f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.direnv + # Generated by Cargo # will have compiled files and executables debug/ @@ -15,6 +17,7 @@ _src _visualizer.json *.kubi +*.kbi /*_log*.txt /*.log @@ -37,4 +40,4 @@ _visualizer.json /mods -.direnv + diff --git a/Cargo.lock b/Cargo.lock index 7c6d0dd..af7ada8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,8 +1307,10 @@ dependencies = [ "bincode", "bytemuck", "fastnoise-lite", + "flume", "glam", "hashbrown 0.14.5", + "log", "nohash-hasher", "num_enum", "nz", diff --git a/kubi-shared/Cargo.toml b/kubi-shared/Cargo.toml index d55cbf6..1105d48 100644 --- a/kubi-shared/Cargo.toml +++ b/kubi-shared/Cargo.toml @@ -14,6 +14,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv serde_with = "3.4" bincode = "1.3" anyhow = "1.0" +flume = "0.11" fastnoise-lite = { version = "1.1", features = ["std", "f64"] } rand = { version = "0.8", default_features = false, features = ["std", "min_const_gen"] } rand_xoshiro = "0.6" @@ -23,6 +24,7 @@ bytemuck = { version = "1.14", features = ["derive"] } static_assertions = "1.1" nz = "0.4" atomic = "0.6" +log = "0.4" [features] default = [] diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 5330bf0..da113fe 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -17,6 +17,8 @@ use crate::{ chunk::{CHUNK_SIZE, BlockDataRef, BlockData} }; +pub mod io_thread; + const SECTOR_SIZE: usize = CHUNK_SIZE * CHUNK_SIZE * CHUNK_SIZE * size_of::(); const RESERVED_SIZE: usize = 1048576; //~1mb (16 sectors assuming 32x32x32 world of 1byte blocks) const RESERVED_SECTOR_COUNT: usize = RESERVED_SIZE / SECTOR_SIZE; diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs new file mode 100644 index 0000000..2386d40 --- /dev/null +++ b/kubi-shared/src/data/io_thread.rs @@ -0,0 +1,166 @@ +use glam::IVec3; +use flume::{Receiver, Sender, TryIter}; +use shipyard::Unique; +use crate::chunk::BlockData; + +use super::WorldSaveFile; + +pub enum IOCommand { + SaveChunk { + position: IVec3, + data: BlockData, + }, + + /// Load a chunk from the disk and send it to the main thread + LoadChunk { + position: IVec3, + }, + + /// Process all pending write commands and make the thread end itself + /// LoadChunk commands will be ignored after this command is received + Kys, +} + + +pub enum IOResponse { + /// A chunk has been loaded from the disk + /// Or not, in which case the data will be None + /// and chunk should be generated + ChunkLoaded { + position: IVec3, + data: Option, + }, + + /// The IO thread has been terminated + Terminated, +} + +struct IOThreadContext { + tx: Sender, + rx: Receiver, + save: WorldSaveFile, +} + +//TODO: Implement proper error handling (I/O errors are rlly common) + +impl IOThreadContext { + /// Should be called ON the IO thread + /// + /// Initializes the IO thread context, opening the file at the given path + /// If there's an error opening the file, the thread will panic (TODO: handle this more gracefully) + pub fn initialize( + tx: Sender, + rx: Receiver, + save: WorldSaveFile, + ) -> Self { + // save.load_data().unwrap(); + Self { tx, rx, save } + } + + pub fn run(mut self) { + loop { + match self.rx.recv().unwrap() { + IOCommand::SaveChunk { position, data } => { + self.save.save_chunk(position, &data).unwrap(); + } + IOCommand::LoadChunk { position } => { + let data = self.save.load_chunk(position).unwrap(); + self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); + } + IOCommand::Kys => { + // Process all pending write commands + while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() { + self.save.save_chunk(position, &data).unwrap(); + } + self.tx.send(IOResponse::Terminated).unwrap(); + return; + } + } + } + } +} + +pub struct IOSingleThread { + tx: Sender, + rx: Receiver, + handle: std::thread::JoinHandle<()>, +} + +impl IOSingleThread { + pub fn spawn(save: WorldSaveFile) -> Self { + // Create channels + let (command_tx, command_rx) = flume::unbounded(); + let (response_tx, response_rx) = flume::unbounded(); + + // Spawn the thread + let builder = std::thread::Builder::new() + .name("io-thread".into()); + let handle = builder.spawn(move || { + let context = IOThreadContext::initialize(response_tx, command_rx, save); + context.run(); + }).unwrap(); + + IOSingleThread { + tx: command_tx, + rx: response_rx, + handle + } + } + + /// Send a command to the IO thread + pub fn send(&self, cmd: IOCommand) { + self.tx.send(cmd).unwrap(); + } + + /// Poll the IO thread for responses (non-blocking) + pub fn poll(&self) -> TryIter { + self.rx.try_iter() + } + + /// Signal the IO thread to process the remaining requests and wait for it to terminate + pub fn stop_sync(&self) { + // Tell the thread to terminate and wait for it to finish + self.tx.send(IOCommand::Kys).unwrap(); + while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} + + // HACK "we have .join at home" + while !self.handle.is_finished() {} + } + + /// Same as stop_sync but doesn't wait for the IO thread to terminate + pub fn stop_async(&self) { + self.tx.send(IOCommand::Kys).unwrap(); + } +} + +impl Drop for IOSingleThread { + fn drop(&mut self) { + self.stop_sync(); + } +} + + +/// This is a stub for future implemntation that may use multiple IO threads +#[derive(Unique)] +pub struct IOThreadManager { + thread: IOSingleThread, +} + +impl IOThreadManager { + pub fn new(save: WorldSaveFile) -> Self { + Self { + thread: IOSingleThread::spawn(save) + } + } + + pub fn send(&self, cmd: IOCommand) { + self.thread.send(cmd); + } + + pub fn poll(&self) -> TryIter { + self.thread.poll() + } +} + +// i think im a girl :3 (noone will ever read this right? :p) + diff --git a/kubi/src/init.rs b/kubi/src/init.rs index 0dfb513..009e9c4 100644 --- a/kubi/src/init.rs +++ b/kubi/src/init.rs @@ -5,14 +5,15 @@ use crate::{ networking::{GameType, ServerAddress}, state::{GameState, NextState} }; -use kubi_shared::data::WorldSaveFile; +use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile}; fn open_local_save_file(path: &Path) -> Result { let mut save_file = WorldSaveFile::new({ OpenOptions::new() .read(true) .write(true) - .open("world.kbi")? + .create(true) + .open(path)? }); if save_file.file.metadata().unwrap().len() == 0 { save_file.initialize()?; @@ -25,13 +26,20 @@ fn open_local_save_file(path: &Path) -> Result { pub fn initialize_from_args( all_storages: AllStoragesView, ) { + // If an address is provided, we're in multiplayer mode (the first argument is the address) + // Otherwise, we're in singleplayer mode and working with local stuff let args: Vec = env::args().collect(); if args.len() > 1 { + // Parse the address and switch the state to connecting let address = args[1].parse::().expect("invalid address"); all_storages.add_unique(GameType::Muliplayer); all_storages.add_unique(ServerAddress(address)); all_storages.borrow::>().unwrap().0 = Some(GameState::Connecting); } else { + // Open the local save file + let save_file = open_local_save_file(Path::new("./world.kubi")).expect("failed to open save file"); + all_storages.add_unique(IOThreadManager::new(save_file)); + // Switch the state and kick off the world loading all_storages.add_unique(GameType::Singleplayer); all_storages.borrow::>().unwrap().0 = Some(GameState::LoadingWorld); } diff --git a/kubi/src/networking/world.rs b/kubi/src/networking/world.rs index efb2cde..b7feb2a 100644 --- a/kubi/src/networking/world.rs +++ b/kubi/src/networking/world.rs @@ -37,7 +37,7 @@ pub fn inject_network_responses_into_manager_queue( let ServerToClientMessage::ChunkResponse { chunk, data, queued } = packet else { unreachable!() }; - manager.add_sussy_response(ChunkTaskResponse::LoadedChunk { + manager.add_sussy_response(ChunkTaskResponse::ChunkWorldgenDone { position: chunk, chunk_data: data, queued diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 7ee4313..9880730 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -8,7 +8,7 @@ use wgpu::util::DeviceExt; use crate::{ networking::UdpClient, player::MainPlayer, - rendering::{world::ChunkVertex, BufferPair, Renderer}, + rendering::{BufferPair, Renderer}, settings::GameSettings, state::GameState, transform::Transform, @@ -16,7 +16,7 @@ use crate::{ use super::{ ChunkStorage, ChunkMeshStorage, chunk::{Chunk, DesiredChunkState, CHUNK_SIZE, ChunkMesh, CurrentChunkState, ChunkData}, - tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, + tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, queue::BlockUpdateQueue, }; @@ -185,7 +185,7 @@ fn process_state_changes( ); } else { let atomic = Arc::new(Atomic::new(AbortState::Continue)); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: 0xbeef_face_dead_cafe, position, abortion: Some(Arc::clone(&atomic)), @@ -273,7 +273,7 @@ fn process_completed_tasks( let mut ops: usize = 0; while let Some(res) = task_manager.receive() { match res { - ChunkTaskResponse::LoadedChunk { position, chunk_data, mut queued } => { + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { //If unwanted chunk is already loaded //It would be ~~...unethical~~ impossible to abort the operation at this point //Instead, we'll just throw it away @@ -308,7 +308,7 @@ fn process_completed_tasks( //increase ops counter ops += 1; }, - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index a481f5f..90e1de8 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -13,7 +13,7 @@ use super::{ use crate::rendering::world::ChunkVertex; pub enum ChunkTask { - LoadChunk { + ChunkWorldgen { seed: u64, position: IVec3, abortion: Option>>, @@ -23,13 +23,14 @@ pub enum ChunkTask { data: MeshGenData } } + pub enum ChunkTaskResponse { - LoadedChunk { + ChunkWorldgenDone { position: IVec3, chunk_data: BlockData, queued: Vec }, - GeneratedMesh { + GenerateMeshDone { position: IVec3, vertices: Vec, indices: Vec, @@ -43,6 +44,7 @@ pub struct ChunkTaskManager { channel: (Sender, Receiver), pool: ThreadPool, } + impl ChunkTaskManager { pub fn new() -> Self { Self { @@ -50,11 +52,17 @@ impl ChunkTaskManager { pool: ThreadPoolBuilder::new().num_threads(4).build().unwrap() } } + + //TODO get rid of add_sussy_response + + /// Add a response to the queue, to be picked up by the main thread + /// Used by the multiplayer netcode, a huge hack pub fn add_sussy_response(&self, response: ChunkTaskResponse) { // this WILL get stuck if the channel is bounded // don't make the channel bounded ever self.channel.0.send(response).unwrap() } + pub fn spawn_task(&self, task: ChunkTask) { let sender = self.channel.0.clone(); self.pool.spawn(move || { @@ -64,22 +72,23 @@ impl ChunkTaskManager { (vertices, indices), (trans_vertices, trans_indices), ) = generate_mesh(position, data); - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, } }, - ChunkTask::LoadChunk { position, seed, abortion } => { + ChunkTask::ChunkWorldgen { position, seed, abortion } => { let Some((chunk_data, queued)) = generate_world(position, seed, abortion) else { log::warn!("aborted operation"); return }; - ChunkTaskResponse::LoadedChunk { position, chunk_data, queued } + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, queued } } }); }); } + pub fn receive(&self) -> Option { self.channel.1.try_recv().ok() } From 570382520cdc6eb0eef7ed1a5a4c60ced8cc5f59 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Mon, 2 Sep 2024 00:22:54 +0200 Subject: [PATCH 2/8] do partial impl --- kubi-shared/src/data/io_thread.rs | 13 +++- kubi/src/world/loading.rs | 119 +++++++++++++++++++++++++++--- kubi/src/world/tasks.rs | 7 +- 3 files changed, 125 insertions(+), 14 deletions(-) diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 2386d40..931e531 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -21,7 +21,7 @@ pub enum IOCommand { Kys, } - +#[derive(Debug)] pub enum IOResponse { /// A chunk has been loaded from the disk /// Or not, in which case the data will be None @@ -69,7 +69,10 @@ impl IOThreadContext { } IOCommand::Kys => { // Process all pending write commands - while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() { + for cmd in self.rx.try_iter() { + let IOCommand::SaveChunk { position, data } = cmd else { + continue; + }; self.save.save_chunk(position, &data).unwrap(); } self.tx.send(IOResponse::Terminated).unwrap(); @@ -119,22 +122,28 @@ impl IOSingleThread { /// Signal the IO thread to process the remaining requests and wait for it to terminate pub fn stop_sync(&self) { + log::debug!("Stopping IO thread (sync)"); + // Tell the thread to terminate and wait for it to finish self.tx.send(IOCommand::Kys).unwrap(); while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} // HACK "we have .join at home" while !self.handle.is_finished() {} + + log::debug!("IO thread stopped"); //almost lol } /// Same as stop_sync but doesn't wait for the IO thread to terminate pub fn stop_async(&self) { + log::debug!("Stopping IO thread (async)"); self.tx.send(IOCommand::Kys).unwrap(); } } impl Drop for IOSingleThread { fn drop(&mut self) { + log::trace!("IOSingleThread dropped, about to sync unsaved data..."); self.stop_sync(); } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 9880730..4326e26 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -1,7 +1,11 @@ use std::sync::Arc; use atomic::{Atomic, Ordering}; use glam::{IVec3, ivec3}; -use kubi_shared::{networking::{channels::Channel, messages::ClientToServerMessage}, worldgen::AbortState}; +use kubi_shared::{ + data::io_thread::{IOCommand, IOResponse, IOThreadManager}, + networking::{channels::Channel, messages::ClientToServerMessage}, + worldgen::AbortState, +}; use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track}; use uflow::SendMode; use wgpu::util::DeviceExt; @@ -20,6 +24,8 @@ use super::{ queue::BlockUpdateQueue, }; +const WORLD_SEED: u64 = 0xbeef_face_dead_cafe; + const MAX_CHUNK_OPS_INGAME: usize = 8; const MAX_CHUNK_OPS: usize = 32; @@ -92,6 +98,7 @@ pub fn update_chunks_if_player_moved( fn process_state_changes( task_manager: UniqueView, + io: Option>, mut udp_client: Option>, mut world: UniqueViewMut, mut vm_meshes: NonSendSync>, @@ -135,7 +142,7 @@ fn process_state_changes( chunk.current_state, CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh, ) => { - chunk.block_data = None; + // chunk.block_data = None; //HACK when downgrading, keep the data so we can save it chunk.current_state = CurrentChunkState::Nothing; }, @@ -184,18 +191,33 @@ fn process_state_changes( SendMode::Reliable ); } else { - let atomic = Arc::new(Atomic::new(AbortState::Continue)); - task_manager.spawn_task(ChunkTask::ChunkWorldgen { - seed: 0xbeef_face_dead_cafe, - position, - abortion: Some(Arc::clone(&atomic)), - }); - abortion = Some(atomic); + + // ============================================================ + // TODO IMPORTANT: DO NOT WAIT FOR THE IO THREAD TO RESPOND, THIS WILL CAUSE A TERRIBLE BOTTLENECK + // find a way to check the world save header from the main thread instead! + // ============================================================ + + if let Some(io) = &io { + // Try to load the chunk from the save file + // In case that fails, we will run worldgen once the IO thread responds + io.send(IOCommand::LoadChunk { position }); + } else { + // If there's no IO thread, we'll just run worldgen right away + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + abortion = Some(atomic); + } } + //Update chunk state let chunk = world.chunks.get_mut(&position).unwrap(); chunk.current_state = CurrentChunkState::Loading; chunk.abortion = abortion; + // =========== //log::trace!("Started loading chunk {position}"); }, @@ -254,7 +276,22 @@ fn process_state_changes( return false } - //HACK, since save files are not implemented, just unload immediately + // If in singleplayer and have an open save file, we need to save the chunk to the disk + + // ========================================================== + //TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED + // OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY + // ========================================================== + if let Some(io) = &io { + if let Some(block_data) = &chunk.block_data { + // log::debug!("issue save command"); + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + return false } true @@ -264,6 +301,7 @@ fn process_state_changes( fn process_completed_tasks( task_manager: UniqueView, + io: Option>, mut world: UniqueViewMut, mut meshes: NonSendSync>, renderer: UniqueView, @@ -271,9 +309,68 @@ fn process_completed_tasks( mut queue: UniqueViewMut, ) { let mut ops: usize = 0; - while let Some(res) = task_manager.receive() { + + //TODO reduce code duplication between loaded/generated chunks + + // Process IO first + if let Some(io) = &io { + for response in io.poll() { + let IOResponse::ChunkLoaded { position, data } = response else { + //TODO this is bad + panic!("Unexpected IO response: {:?}", response); + }; + + //check if chunk exists + let Some(chunk) = world.chunks.get_mut(&position) else { + log::warn!("LOADED blocks data discarded: chunk doesn't exist"); + continue + }; + + //we cannot have abortion here but just in case, reset it + chunk.abortion = None; + + //check if chunk still wants it + if !matches!(chunk.desired_state, DesiredChunkState::Loaded | DesiredChunkState::Rendered) { + log::warn!("LOADED block data discarded: state undesirable: {:?}", chunk.desired_state); + continue + } + + // check if we actually got the data + if let Some(data) = data { + // If we did get the data, yay :3 + chunk.block_data = Some(ChunkData { + blocks: data + }); + chunk.current_state = CurrentChunkState::Loaded; + ops += 1; + } else { + // If we didn't get the data, we need to run worldgen + //TODO: this is a terrible bottleneck + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + chunk.abortion = Some(atomic); + } + } + + //return early if we've reached the limit + if ops >= match *state { + GameState::InGame => MAX_CHUNK_OPS_INGAME, + _ => MAX_CHUNK_OPS, + } { return } + // XXX: this will completely skip polling the task manager if we've reached the limit + // this is probably fine, but it might be worth revisiting later + } + + for res in task_manager.poll() { match res { ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { + //TODO this can fuck shit up really badly if io op gets overwritten by worldgen chunk + //TODO only accept if loading stage, not loaded + //If unwanted chunk is already loaded //It would be ~~...unethical~~ impossible to abort the operation at this point //Instead, we'll just throw it away diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index 90e1de8..eebc663 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -1,6 +1,6 @@ use std::sync::Arc; use atomic::Atomic; -use flume::{Sender, Receiver}; +use flume::{Receiver, Sender, TryIter}; use glam::IVec3; use kubi_shared::{queue::QueuedBlock, worldgen::AbortState}; use shipyard::Unique; @@ -89,7 +89,12 @@ impl ChunkTaskManager { }); } + #[deprecated(note = "use poll instead")] pub fn receive(&self) -> Option { self.channel.1.try_recv().ok() } + + pub fn poll(&self) -> TryIter { + self.channel.1.try_iter() + } } From 61b99409cef63ecccb831bcae02629a719080ee8 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Mon, 2 Sep 2024 01:31:29 +0200 Subject: [PATCH 3/8] this sucks --- kubi-shared/src/data.rs | 3 ++ kubi-shared/src/data/io_thread.rs | 88 ++++++++++++++++++++++++------- kubi/src/world.rs | 8 +-- kubi/src/world/chunk.rs | 2 + kubi/src/world/loading.rs | 3 ++ kubi/src/world/queue.rs | 1 + 6 files changed, 83 insertions(+), 22 deletions(-) diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index da113fe..3b66f8d 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -116,6 +116,9 @@ impl WorldSaveFile { header_modified = true; self.allocate_sector() }); + if header_modified { + self.header.chunk_map.insert(position, sector); + } let offset = sector as u64 * SECTOR_SIZE as u64; diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 931e531..72b3da1 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -2,9 +2,12 @@ use glam::IVec3; use flume::{Receiver, Sender, TryIter}; use shipyard::Unique; use crate::chunk::BlockData; - use super::WorldSaveFile; +// Maximum amount of chunks to save in a single batch before checking if there are any pending read requests +// may be broken, so currently disabled +const MAX_SAVE_BATCH_SIZE: usize = usize::MAX; + pub enum IOCommand { SaveChunk { position: IVec3, @@ -39,6 +42,7 @@ struct IOThreadContext { tx: Sender, rx: Receiver, save: WorldSaveFile, + save_queue: Vec<(IVec3, BlockData)>, } //TODO: Implement proper error handling (I/O errors are rlly common) @@ -54,29 +58,77 @@ impl IOThreadContext { save: WorldSaveFile, ) -> Self { // save.load_data().unwrap(); - Self { tx, rx, save } + let save_queue = Vec::new(); + Self { tx, rx, save, save_queue } } pub fn run(mut self) { loop { - match self.rx.recv().unwrap() { - IOCommand::SaveChunk { position, data } => { - self.save.save_chunk(position, &data).unwrap(); + // because were waiting for the next command, we can't process the save_queue + // which breaks batching, so we need to check if there are any pending save requests + // and if there are, use non-blocking recv to give them a chance to be processed + 'rx: while let Some(command) = { + if self.save_queue.len() > 0 { + self.rx.try_recv().ok() + } else { + self.rx.recv().ok() } - IOCommand::LoadChunk { position } => { - let data = self.save.load_chunk(position).unwrap(); - self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); - } - IOCommand::Kys => { - // Process all pending write commands - for cmd in self.rx.try_iter() { - let IOCommand::SaveChunk { position, data } = cmd else { - continue; - }; - self.save.save_chunk(position, &data).unwrap(); + } { + match command { + IOCommand::SaveChunk { position, data } => { + // if chunk already has a save request, overwrite it + for (pos, old_data) in self.save_queue.iter_mut() { + if *pos == position { + *old_data = data; + continue 'rx; + } + } + // if not, save to the queue + self.save_queue.push((position, data)); + //log::trace!("amt of unsaved chunks: {}", self.save_queue.len()); } - self.tx.send(IOResponse::Terminated).unwrap(); - return; + IOCommand::LoadChunk { position } => { + // HOLD ON + // first check if the chunk is already in the save queue + // if it is, send it and continue + // (NOT doing this WILL result in data loss if the user returns to the chunk too quickly) + for (pos, data) in self.save_queue.iter() { + if *pos == position { + self.tx.send(IOResponse::ChunkLoaded { position, data: Some(data.clone()) }).unwrap(); + continue 'rx; + } + } + let data = self.save.load_chunk(position).unwrap(); + self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); + } + IOCommand::Kys => { + // Process all pending write commands + log::info!("info: queue has {} chunks", self.save_queue.len()); + let mut saved_amount = 0; + for (pos, data) in self.save_queue.drain(..) { + self.save.save_chunk(pos, &data).unwrap(); + saved_amount += 1; + } + log::debug!("now, moving on to the rx queue..."); + for cmd in self.rx.try_iter() { + let IOCommand::SaveChunk { position, data } = cmd else { + continue; + }; + self.save.save_chunk(position, &data).unwrap(); + saved_amount += 1; + } + log::info!("saved {} chunks on exit", saved_amount); + self.tx.send(IOResponse::Terminated).unwrap(); + return; + } + } + } + // between every betch of requests, check if there are any pending save requests + if self.save_queue.len() > 0 { + let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len()); + log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE); + for (pos, data) in self.save_queue.drain(..will_drain) { + self.save.save_chunk(pos, &data).unwrap(); } } } diff --git a/kubi/src/world.rs b/kubi/src/world.rs index 1626d17..fe1f8ff 100644 --- a/kubi/src/world.rs +++ b/kubi/src/world.rs @@ -62,10 +62,10 @@ impl ChunkStorage { } } -#[derive(Unique)] -pub struct WorldInfo { - pub seed: u32, -} +// #[derive(Unique)] +// pub struct WorldInfo { +// pub seed: u32, +// } #[derive(Default, Unique)] pub struct ChunkMeshStorage { diff --git a/kubi/src/world/chunk.rs b/kubi/src/world/chunk.rs index 27c52a9..f728f28 100644 --- a/kubi/src/world/chunk.rs +++ b/kubi/src/world/chunk.rs @@ -57,6 +57,7 @@ pub struct Chunk { pub desired_state: DesiredChunkState, pub abortion: Option>>, pub mesh_dirty: bool, + pub data_modified: bool, } impl Chunk { @@ -69,6 +70,7 @@ impl Chunk { desired_state: Default::default(), abortion: None, mesh_dirty: false, + data_modified: false, } } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 4326e26..2ed2612 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -282,6 +282,9 @@ fn process_state_changes( //TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED // OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY // ========================================================== + //XXX: CHECK IF WE REALLY NEED THIS OR IF WE CAN JUST KILL THE CHUNK RIGHT AWAY + //CHANGES TO CHUNK SAVING LOGIC SHOULD HAVE MADE THE ABOVE COMMENT OBSOLETE + if let Some(io) = &io { if let Some(block_data) = &chunk.block_data { // log::debug!("issue save command"); diff --git a/kubi/src/world/queue.rs b/kubi/src/world/queue.rs index 76d6b02..8f46335 100644 --- a/kubi/src/world/queue.rs +++ b/kubi/src/world/queue.rs @@ -27,6 +27,7 @@ pub fn apply_queued_blocks( let (chunk_pos, block_pos) = ChunkStorage::to_chunk_coords(event.position); let chunk = world.chunks.get_mut(&chunk_pos).expect("This error should never happen, if it does then something is super fucked up and the whole project needs to be burnt down."); chunk.mesh_dirty = true; + chunk.data_modified = true; //If block pos is close to the border, some neighbors may be dirty! const DIRECTIONS: [IVec3; 6] = [ ivec3(1, 0, 0), From 64a67d0ffe4a3fbd93f6a86e36b74a9ad08db7fe Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Mon, 2 Sep 2024 01:32:25 +0200 Subject: [PATCH 4/8] only save modified chunks --- kubi/src/world/loading.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 2ed2612..d1cbea4 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -287,11 +287,14 @@ fn process_state_changes( if let Some(io) = &io { if let Some(block_data) = &chunk.block_data { - // log::debug!("issue save command"); - io.send(IOCommand::SaveChunk { - position, - data: block_data.blocks.clone(), - }); + // Only save the chunk if it has been modified + if chunk.data_modified { + // log::debug!("issue save command"); + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } } } From 37e68912ebe213ab80a8d9d0bd9770e36c593e64 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Mon, 2 Sep 2024 02:01:02 +0200 Subject: [PATCH 5/8] make it work pretty well i guess --- kubi-shared/src/data.rs | 46 +++++++++++++++++++------------ kubi-shared/src/data/io_thread.rs | 17 ++++++++++-- kubi/src/lib.rs | 20 ++++++++++---- kubi/src/networking.rs | 18 +++++------- kubi/src/world/loading.rs | 45 ++++++++++++++++++++++-------- 5 files changed, 98 insertions(+), 48 deletions(-) diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 3b66f8d..8c0ae41 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -49,19 +49,19 @@ impl Default for WorldSaveDataHeader { } } +pub type SharedHeader = Arc>; + #[derive(Unique)] pub struct WorldSaveFile { pub file: File, - pub header: WorldSaveDataHeader, + pub header: SharedHeader, } -pub type SharedSaveFile = Arc>; - impl WorldSaveFile { pub fn new(file: File) -> Self { WorldSaveFile { file, - header: WorldSaveDataHeader::default() + header: Arc::new(RwLock::new(WorldSaveDataHeader::default())), } } @@ -78,7 +78,7 @@ impl WorldSaveFile { } let limit = (RESERVED_SIZE - SUBHEADER_SIZE) as u64; - self.header = bincode::deserialize_from((&self.file).take(limit))?; + *self.header.write().unwrap() = bincode::deserialize_from((&self.file).take(limit))?; Ok(()) } @@ -90,7 +90,7 @@ impl WorldSaveFile { //XXX: this can cause the header to destroy chunk data (if it's WAY too long) // read has checks against this, but write doesn't // 1mb is pretty generous tho, so it's not a *big* deal - bincode::serialize_into(&self.file, &self.header)?; + bincode::serialize_into(&self.file, &*self.header.read().unwrap())?; Ok(()) } @@ -104,21 +104,27 @@ impl WorldSaveFile { Ok(()) } - fn allocate_sector(&mut self) -> u32 { - let value = self.header.sector_count + 1; - self.header.sector_count += 1; - value - } + // fn allocate_sector(&mut self) -> u32 { + // let mut lock = self.header.write().unwrap(); + // let value = lock.sector_count + 1; + // lock.sector_count += 1; + // value + // } pub fn save_chunk(&mut self, position: IVec3, data: &BlockDataRef) -> Result<()> { + let mut header_lock = self.header.write().unwrap(); + let mut header_modified = false; - let sector = self.header.chunk_map.get(&position).copied().unwrap_or_else(|| { + let sector = header_lock.chunk_map.get(&position).copied().unwrap_or_else(|| { header_modified = true; - self.allocate_sector() + let sector = header_lock.sector_count; + header_lock.sector_count += 1; + header_lock.chunk_map.insert(position, sector); + sector + // self.allocate_sector() }); - if header_modified { - self.header.chunk_map.insert(position, sector); - } + + drop(header_lock); let offset = sector as u64 * SECTOR_SIZE as u64; @@ -141,11 +147,11 @@ impl WorldSaveFile { } pub fn chunk_exists(&self, position: IVec3) -> bool { - self.header.chunk_map.contains_key(&position) + self.header.read().unwrap().chunk_map.contains_key(&position) } pub fn load_chunk(&mut self, position: IVec3) -> Result> { - let Some(§or) = self.header.chunk_map.get(&position) else { + let Some(§or) = self.header.read().unwrap().chunk_map.get(&position) else { return Ok(None); }; @@ -171,4 +177,8 @@ impl WorldSaveFile { Ok(Some(data)) } + + pub fn get_shared_header(&self) -> SharedHeader { + Arc::clone(&self.header) + } } diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 72b3da1..5991d40 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -2,7 +2,7 @@ use glam::IVec3; use flume::{Receiver, Sender, TryIter}; use shipyard::Unique; use crate::chunk::BlockData; -use super::WorldSaveFile; +use super::{SharedHeader, WorldSaveFile}; // Maximum amount of chunks to save in a single batch before checking if there are any pending read requests // may be broken, so currently disabled @@ -139,6 +139,7 @@ pub struct IOSingleThread { tx: Sender, rx: Receiver, handle: std::thread::JoinHandle<()>, + header: SharedHeader, } impl IOSingleThread { @@ -147,6 +148,9 @@ impl IOSingleThread { let (command_tx, command_rx) = flume::unbounded(); let (response_tx, response_rx) = flume::unbounded(); + // Grab a handle to the header + let header = save.get_shared_header(); + // Spawn the thread let builder = std::thread::Builder::new() .name("io-thread".into()); @@ -158,7 +162,8 @@ impl IOSingleThread { IOSingleThread { tx: command_tx, rx: response_rx, - handle + handle, + header, } } @@ -191,6 +196,10 @@ impl IOSingleThread { log::debug!("Stopping IO thread (async)"); self.tx.send(IOCommand::Kys).unwrap(); } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.header.read().unwrap().chunk_map.contains_key(&position) + } } impl Drop for IOSingleThread { @@ -221,6 +230,10 @@ impl IOThreadManager { pub fn poll(&self) -> TryIter { self.thread.poll() } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.thread.chunk_exists(position) + } } // i think im a girl :3 (noone will ever read this right? :p) diff --git a/kubi/src/lib.rs b/kubi/src/lib.rs index 1bcd7cc..2a9ab45 100644 --- a/kubi/src/lib.rs +++ b/kubi/src/lib.rs @@ -57,11 +57,7 @@ pub(crate) mod client_physics; pub(crate) mod chat; use world::{ - init_game_world, - loading::update_loaded_world_around_player, - raycast::update_raycasts, - queue::apply_queued_blocks, - tasks::ChunkTaskManager, + init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager }; use player::{spawn_player, MainPlayer}; use prefabs::load_prefabs; @@ -157,7 +153,6 @@ fn update() -> Workload { kubi_ui_end, update_state, exit_on_esc, - disconnect_on_exit.run_if(is_multiplayer), update_rendering_late, ).into_sequential_workload() } @@ -183,6 +178,13 @@ fn after_render() -> Workload { ).into_sequential_workload() } +fn on_exit() -> Workload{ + ( + disconnect_on_exit.run_if(is_multiplayer), + save_on_exit.run_if(is_singleplayer), + ).into_sequential_workload().run_if(is_ingame_or_loading) +} + #[cfg(all(windows, not(debug_assertions)))] fn attach_console() { use winapi::um::wincon::{AttachConsole, ATTACH_PARENT_PROCESS}; @@ -243,6 +245,7 @@ pub fn kubi_main( world.add_workload(update); //world.add_workload(render); world.add_workload(after_render); + world.add_workload(on_exit); //Save _visualizer.json #[cfg(feature = "generate_visualizer_data")] { @@ -350,6 +353,11 @@ pub fn kubi_main( window_target.exit(); } }, + + Event::LoopExiting => { + world.run_workload(on_exit).unwrap(); + }, + _ => (), }; }).unwrap(); diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 56f5fde..e9e5b5e 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -159,19 +159,15 @@ pub fn update_networking_late() -> Workload { } pub fn disconnect_on_exit( - exit: UniqueView, mut client: UniqueViewMut, ) { - //TODO check if this works - if exit.0 { - if client.0.is_active() { - client.0.flush(); - client.0.disconnect(); - while client.0.is_active() { client.0.step().for_each(|_|()); } - log::info!("Client disconnected"); - } else { - log::info!("Client inactive") - } + if client.0.is_active() { + client.0.flush(); + client.0.disconnect(); + while client.0.is_active() { client.0.step().for_each(|_|()); } + log::info!("Client disconnected"); + } else { + log::info!("Client inactive") } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index d1cbea4..c06afbf 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -192,17 +192,22 @@ fn process_state_changes( ); } else { - // ============================================================ - // TODO IMPORTANT: DO NOT WAIT FOR THE IO THREAD TO RESPOND, THIS WILL CAUSE A TERRIBLE BOTTLENECK - // find a way to check the world save header from the main thread instead! - // ============================================================ + // If the chunk exists in the save file (and save file is there in the first place), + // ... we'll try to load it + // Otherwise, we'll run worldgen + + let mut should_run_worldgen = true; if let Some(io) = &io { - // Try to load the chunk from the save file - // In case that fails, we will run worldgen once the IO thread responds - io.send(IOCommand::LoadChunk { position }); - } else { - // If there's no IO thread, we'll just run worldgen right away + if io.chunk_exists(position) { + // Try to load the chunk from the save file + // In case that fails, we will run worldgen once the IO thread responds + io.send(IOCommand::LoadChunk { position }); + should_run_worldgen = false; + } + } + + if should_run_worldgen { let atomic = Arc::new(Atomic::new(AbortState::Continue)); task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: WORLD_SEED, @@ -348,10 +353,9 @@ fn process_completed_tasks( blocks: data }); chunk.current_state = CurrentChunkState::Loaded; - ops += 1; } else { // If we didn't get the data, we need to run worldgen - //TODO: this is a terrible bottleneck + // XXX: will this ever happen? we should always have the data in the save file let atomic = Arc::new(Atomic::new(AbortState::Continue)); task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: WORLD_SEED, @@ -360,6 +364,8 @@ fn process_completed_tasks( }); chunk.abortion = Some(atomic); } + + ops += 1; } //return early if we've reached the limit @@ -495,3 +501,20 @@ fn process_completed_tasks( } { break } } } + +/// Save all modified chunks to the disk +pub fn save_on_exit( + io: UniqueView, + world: UniqueView, +) { + for (&position, chunk) in &world.chunks { + if let Some(block_data) = &chunk.block_data { + if chunk.data_modified { + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + } +} From 884551089cd2b64c12c9f8db81e5ca48468d17f5 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Tue, 3 Sep 2024 15:47:04 +0200 Subject: [PATCH 6/8] Initial server save file integration (NO SAVING YET, ONLY LOADING) --- Server.toml | 1 + kubi-server/src/config.rs | 3 +- kubi-server/src/main.rs | 2 + kubi-server/src/world.rs | 9 ++-- kubi-server/src/world/save.rs | 16 +++++++ kubi-server/src/world/tasks.rs | 48 ++++++++++++++++---- kubi-shared/src/data.rs | 21 ++++++++- kubi-shared/src/data/io_thread.rs | 9 ++++ {kubi => kubi-shared}/src/fixed_timestamp.rs | 0 kubi-shared/src/lib.rs | 1 + kubi/src/init.rs | 18 +------- kubi/src/lib.rs | 3 +- kubi/src/world/loading.rs | 2 +- 13 files changed, 100 insertions(+), 33 deletions(-) create mode 100644 kubi-server/src/world/save.rs rename {kubi => kubi-shared}/src/fixed_timestamp.rs (100%) diff --git a/Server.toml b/Server.toml index 48065e9..a1dc9c7 100644 --- a/Server.toml +++ b/Server.toml @@ -4,6 +4,7 @@ max_clients = 32 timeout_ms = 10000 [world] +file = "world.kubi" seed = 0xfeb_face_dead_cafe preheat_radius = 8 diff --git a/kubi-server/src/config.rs b/kubi-server/src/config.rs index 561403d..079e577 100644 --- a/kubi-server/src/config.rs +++ b/kubi-server/src/config.rs @@ -1,6 +1,6 @@ use shipyard::{AllStoragesView, Unique}; use serde::{Serialize, Deserialize}; -use std::{fs, net::SocketAddr}; +use std::{fs, net::SocketAddr, path::PathBuf}; #[derive(Serialize, Deserialize)] pub struct ConfigTableServer { @@ -12,6 +12,7 @@ pub struct ConfigTableServer { #[derive(Serialize, Deserialize)] pub struct ConfigTableWorld { + pub file: Option, pub seed: u64, pub preheat_radius: u32, } diff --git a/kubi-server/src/main.rs b/kubi-server/src/main.rs index 790388a..3f7f243 100644 --- a/kubi-server/src/main.rs +++ b/kubi-server/src/main.rs @@ -1,5 +1,6 @@ use shipyard::{IntoWorkload, Workload, WorkloadModificator, World}; use std::{thread, time::Duration}; +use kubi_shared::fixed_timestamp::init_fixed_timestamp_storage; mod util; mod config; @@ -16,6 +17,7 @@ use world::{update_world, init_world}; fn initialize() -> Workload { ( + init_fixed_timestamp_storage, read_config, bind_server, init_client_maps, diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index 841b11a..ecc7de8 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -24,12 +24,13 @@ use crate::{ pub mod chunk; pub mod tasks; +pub mod save; use chunk::Chunk; use self::{ tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, - chunk::ChunkState + chunk::ChunkState, }; #[derive(Unique, Default)] @@ -106,7 +107,7 @@ fn process_chunk_requests( chunk.state = ChunkState::Loading; chunk.subscriptions.insert(message.client_id); chunk_manager.chunks.insert(chunk_position, chunk); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.run(ChunkTask::LoadChunk { position: chunk_position, seed: config.world.seed, }); @@ -278,7 +279,7 @@ pub fn preheat_world( let mut chunk = Chunk::new(); chunk.state = ChunkState::Loading; chunk_manager.chunks.insert(chunk_position, chunk); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.run(ChunkTask::LoadChunk { position: chunk_position, seed: config.world.seed, }); @@ -292,7 +293,7 @@ pub fn init_world() -> Workload { init_chunk_manager_and_block_queue.before_all(preheat_world), init_chunk_task_manager.before_all(preheat_world), preheat_world, - ).into_workload() + ).into_sequential_workload() } pub fn update_world() -> Workload { diff --git a/kubi-server/src/world/save.rs b/kubi-server/src/world/save.rs new file mode 100644 index 0000000..834203e --- /dev/null +++ b/kubi-server/src/world/save.rs @@ -0,0 +1,16 @@ +use kubi_shared::data::{io_thread::IOThreadManager, open_local_save_file}; +use shipyard::{AllStoragesView, UniqueView}; + +use crate::config::ConfigTable; + +pub fn init_save_file(storages: &AllStoragesView) -> Option { + let config = storages.borrow::>().unwrap(); + if let Some(file_path) = &config.world.file { + log::info!("Initializing save file from {:?}", file_path); + let save = open_local_save_file(&file_path).unwrap(); + Some(IOThreadManager::new(save)) + } else { + log::warn!("No save file specified, world will not be saved"); + None + } +} diff --git a/kubi-server/src/world/tasks.rs b/kubi-server/src/world/tasks.rs index 40aa615..3ea5347 100644 --- a/kubi-server/src/world/tasks.rs +++ b/kubi-server/src/world/tasks.rs @@ -4,10 +4,9 @@ use glam::IVec3; use rayon::{ThreadPool, ThreadPoolBuilder}; use anyhow::Result; use kubi_shared::{ - chunk::BlockData, - worldgen::generate_world, - queue::QueuedBlock, + chunk::BlockData, data::io_thread::{IOCommand, IOResponse, IOThreadManager}, queue::QueuedBlock, worldgen::generate_world }; +use super::save::init_save_file; pub enum ChunkTask { LoadChunk { @@ -28,15 +27,30 @@ pub enum ChunkTaskResponse { pub struct ChunkTaskManager { channel: (Sender, Receiver), pool: ThreadPool, + iota: Option, } + impl ChunkTaskManager { - pub fn new() -> Result { + pub fn new(iota: Option) -> Result { Ok(Self { channel: unbounded(), - pool: ThreadPoolBuilder::new().build()? + pool: ThreadPoolBuilder::new().build()?, + iota, }) } - pub fn spawn_task(&self, task: ChunkTask) { + + pub fn run(&self, task: ChunkTask) { + // 1. Check if the chunk exists in the save file + #[allow(irrefutable_let_patterns)] + if let ChunkTask::LoadChunk { position, .. } = &task { + if let Some(iota) = &self.iota { + if iota.chunk_exists(*position) { + iota.send(IOCommand::LoadChunk { position: *position }); + } + } + } + + // 2. Generate the chunk if it doesn't exist let sender = self.channel.0.clone(); self.pool.spawn(move || { sender.send(match task { @@ -48,13 +62,31 @@ impl ChunkTaskManager { }).unwrap() }) } + pub fn receive(&self) -> Option { - self.channel.1.try_recv().ok() + // Try to receive IO results first + // If there are none, try to receive worldgen results + self.iota.as_ref().map(|iota| { + iota.poll_single().map(|response| match response { + IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded { + chunk_position: position, + blocks: data.expect("chunk data exists in the header, but was not loaded"), + queue: Vec::with_capacity(0) + }, + _ => panic!("Unexpected response from IO thread"), + }) + }).flatten().or_else(|| { + self.channel.1.try_recv().ok() + }) } } pub fn init_chunk_task_manager( storages: AllStoragesView ) { - storages.add_unique(ChunkTaskManager::new().expect("ChunkTaskManager Init failed")); + let iota = init_save_file(&storages); + storages.add_unique( + ChunkTaskManager::new(iota) + .expect("ChunkTaskManager Init failed") + ); } diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 8c0ae41..1274130 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -1,7 +1,8 @@ use std::{ - fs::File, mem::size_of, + fs::{File, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, + path::Path, borrow::Cow, sync::{Arc, RwLock} }; @@ -182,3 +183,21 @@ impl WorldSaveFile { Arc::clone(&self.header) } } + +/// Utility function to open a local save file, creating it if it doesn't exist +pub fn open_local_save_file(path: &Path) -> Result { + let mut save_file = WorldSaveFile::new({ + OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path)? + }); + if save_file.file.metadata().unwrap().len() == 0 { + save_file.initialize()?; + } else { + save_file.load_data()?; + } + Ok(save_file) +} + diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index 5991d40..eac20fd 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -172,6 +172,11 @@ impl IOSingleThread { self.tx.send(cmd).unwrap(); } + /// Poll the IO thread for a single response (non-blocking) + pub fn poll_single(&self) -> Option { + self.rx.try_recv().ok() + } + /// Poll the IO thread for responses (non-blocking) pub fn poll(&self) -> TryIter { self.rx.try_iter() @@ -227,6 +232,10 @@ impl IOThreadManager { self.thread.send(cmd); } + pub fn poll_single(&self) -> Option { + self.thread.poll_single() + } + pub fn poll(&self) -> TryIter { self.thread.poll() } diff --git a/kubi/src/fixed_timestamp.rs b/kubi-shared/src/fixed_timestamp.rs similarity index 100% rename from kubi/src/fixed_timestamp.rs rename to kubi-shared/src/fixed_timestamp.rs diff --git a/kubi-shared/src/lib.rs b/kubi-shared/src/lib.rs index f262514..0c8b4c3 100644 --- a/kubi-shared/src/lib.rs +++ b/kubi-shared/src/lib.rs @@ -8,3 +8,4 @@ pub mod entity; pub mod player; pub mod queue; pub mod data; +pub mod fixed_timestamp; diff --git a/kubi/src/init.rs b/kubi/src/init.rs index 009e9c4..fba8a51 100644 --- a/kubi/src/init.rs +++ b/kubi/src/init.rs @@ -5,23 +5,7 @@ use crate::{ networking::{GameType, ServerAddress}, state::{GameState, NextState} }; -use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile}; - -fn open_local_save_file(path: &Path) -> Result { - let mut save_file = WorldSaveFile::new({ - OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(path)? - }); - if save_file.file.metadata().unwrap().len() == 0 { - save_file.initialize()?; - } else { - save_file.load_data()?; - } - Ok(save_file) -} +use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile, open_local_save_file}; pub fn initialize_from_args( all_storages: AllStoragesView, diff --git a/kubi/src/lib.rs b/kubi/src/lib.rs index 2a9ab45..27c7074 100644 --- a/kubi/src/lib.rs +++ b/kubi/src/lib.rs @@ -23,7 +23,9 @@ use winit::{ use glam::vec3; use std::time::Instant; +//TODO remove these re-exports pub(crate) use kubi_shared::transform; +pub(crate) use kubi_shared::fixed_timestamp; mod ui; pub(crate) use ui::{ @@ -51,7 +53,6 @@ pub(crate) mod hui_integration; pub(crate) mod networking; pub(crate) mod init; pub(crate) mod color; -pub(crate) mod fixed_timestamp; pub(crate) mod filesystem; pub(crate) mod client_physics; pub(crate) mod chat; diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index c06afbf..9a42f75 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -24,7 +24,7 @@ use super::{ queue::BlockUpdateQueue, }; -const WORLD_SEED: u64 = 0xbeef_face_dead_cafe; +const WORLD_SEED: u64 = 0xfeb_face_dead_cafe; const MAX_CHUNK_OPS_INGAME: usize = 8; const MAX_CHUNK_OPS: usize = 32; From 63e26e3a5b141c56169d026621685f98e1132485 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Tue, 3 Sep 2024 15:48:10 +0200 Subject: [PATCH 7/8] unset data_modified once save command is issued --- kubi/src/world/loading.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 9a42f75..de3898d 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -295,6 +295,7 @@ fn process_state_changes( // Only save the chunk if it has been modified if chunk.data_modified { // log::debug!("issue save command"); + chunk.data_modified = false; io.send(IOCommand::SaveChunk { position, data: block_data.blocks.clone(), From 1b89756648ab9c70e4dcd350780e1b9e15ba46c5 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Wed, 11 Sep 2024 11:28:37 +0200 Subject: [PATCH 8/8] Implement saving on the server-side --- kubi-server/src/client.rs | 2 +- kubi-server/src/main.rs | 11 +++++--- kubi-server/src/world.rs | 6 +++- kubi-server/src/world/chunk.rs | 3 ++ kubi-server/src/world/save.rs | 31 +++++++++++++++++++-- kubi-server/src/world/tasks.rs | 50 +++++++++++++++++++++------------- kubi/src/world/queue.rs | 3 ++ 7 files changed, 79 insertions(+), 27 deletions(-) diff --git a/kubi-server/src/client.rs b/kubi-server/src/client.rs index 2b60bf6..d4d2121 100644 --- a/kubi-server/src/client.rs +++ b/kubi-server/src/client.rs @@ -50,7 +50,7 @@ pub fn sync_client_positions( }; //log movement (annoying duh) - log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); + // log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); //Apply position to server-side client let mut trans = (&mut transforms).get(message.entity_id).unwrap(); diff --git a/kubi-server/src/main.rs b/kubi-server/src/main.rs index 3f7f243..eca8753 100644 --- a/kubi-server/src/main.rs +++ b/kubi-server/src/main.rs @@ -1,6 +1,6 @@ -use shipyard::{IntoWorkload, Workload, WorkloadModificator, World}; +use shipyard::{IntoWorkload, SystemModificator, Workload, WorkloadModificator, World}; use std::{thread, time::Duration}; -use kubi_shared::fixed_timestamp::init_fixed_timestamp_storage; +use kubi_shared::fixed_timestamp::{FixedTimestamp, init_fixed_timestamp_storage}; mod util; mod config; @@ -13,7 +13,7 @@ use config::read_config; use server::{bind_server, update_server, log_server_errors}; use client::{init_client_maps, on_client_disconnect, sync_client_positions}; use auth::authenticate_players; -use world::{update_world, init_world}; +use world::{init_world, save::save_modified, update_world}; fn initialize() -> Workload { ( @@ -34,7 +34,10 @@ fn update() -> Workload { update_world, sync_client_positions, on_client_disconnect, - ).into_workload() + ).into_workload(), + save_modified + .into_workload() + .make_fixed(10000, 0), ).into_sequential_workload() } diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index ecc7de8..0e05b27 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -250,7 +250,11 @@ fn process_block_queue( let Some(blocks) = &mut chunk.blocks else { return true }; - blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize] = item.block_type; + let block = &mut blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize]; + if item.block_type != *block { + *block = item.block_type; + chunk.data_modified = true; + } false }); if initial_len != queue.queue.len() { diff --git a/kubi-server/src/world/chunk.rs b/kubi-server/src/world/chunk.rs index 7287d19..3fdce80 100644 --- a/kubi-server/src/world/chunk.rs +++ b/kubi-server/src/world/chunk.rs @@ -16,13 +16,16 @@ pub struct Chunk { pub state: ChunkState, pub blocks: Option, pub subscriptions: HashSet>, + pub data_modified: bool, } + impl Chunk { pub fn new() -> Self { Self { state: ChunkState::Nothing, blocks: None, subscriptions: HashSet::with_capacity_and_hasher(4, BuildNoHashHasher::default()), + data_modified: false, } } } diff --git a/kubi-server/src/world/save.rs b/kubi-server/src/world/save.rs index 834203e..9fda708 100644 --- a/kubi-server/src/world/save.rs +++ b/kubi-server/src/world/save.rs @@ -1,7 +1,10 @@ use kubi_shared::data::{io_thread::IOThreadManager, open_local_save_file}; -use shipyard::{AllStoragesView, UniqueView}; - +use shipyard::{AllStoragesView, UniqueView, UniqueViewMut}; use crate::config::ConfigTable; +use super::{ + tasks::{ChunkTask, ChunkTaskManager}, + ChunkManager, +}; pub fn init_save_file(storages: &AllStoragesView) -> Option { let config = storages.borrow::>().unwrap(); @@ -14,3 +17,27 @@ pub fn init_save_file(storages: &AllStoragesView) -> Option { None } } + +pub fn save_modified( + mut chunks: UniqueViewMut, + ctm: UniqueView, +) { + log::info!("Saving..."); + let mut amount_saved = 0; + for (position, chunk) in chunks.chunks.iter_mut() { + if chunk.data_modified { + let Some(data) = chunk.blocks.clone() else { + continue + }; + ctm.run(ChunkTask::SaveChunk { + position: *position, + data: data, + }); + chunk.data_modified = false; + amount_saved += 1; + } + } + if amount_saved > 0 { + log::info!("Queued {} chunks for saving", amount_saved); + } +} \ No newline at end of file diff --git a/kubi-server/src/world/tasks.rs b/kubi-server/src/world/tasks.rs index 3ea5347..cae2d56 100644 --- a/kubi-server/src/world/tasks.rs +++ b/kubi-server/src/world/tasks.rs @@ -12,7 +12,11 @@ pub enum ChunkTask { LoadChunk { position: IVec3, seed: u64, - } + }, + SaveChunk { + position: IVec3, + data: BlockData, + }, } pub enum ChunkTaskResponse { @@ -40,27 +44,35 @@ impl ChunkTaskManager { } pub fn run(&self, task: ChunkTask) { - // 1. Check if the chunk exists in the save file - #[allow(irrefutable_let_patterns)] - if let ChunkTask::LoadChunk { position, .. } = &task { - if let Some(iota) = &self.iota { - if iota.chunk_exists(*position) { - iota.send(IOCommand::LoadChunk { position: *position }); + match task { + ChunkTask::LoadChunk { position: chunk_position, seed } => { + // 1. Check if the chunk exists in the save file + if let ChunkTask::LoadChunk { position, .. } = &task { + if let Some(iota) = &self.iota { + if iota.chunk_exists(*position) { + iota.send(IOCommand::LoadChunk { position: *position }); + return + } + } } - } - } - // 2. Generate the chunk if it doesn't exist - let sender = self.channel.0.clone(); - self.pool.spawn(move || { - sender.send(match task { - ChunkTask::LoadChunk { position: chunk_position, seed } => { - //unwrap is fine because abort is not possible - let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap(); - ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } + // 2. Generate the chunk if it doesn't exist + let sender = self.channel.0.clone(); + self.pool.spawn(move || { + sender.send({ + //unwrap is fine because abort is not possible + let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap(); + ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } + }).unwrap() + }); + }, + ChunkTask::SaveChunk { position, data } => { + // Save the chunk to the save file + if let Some(iota) = &self.iota { + iota.send(IOCommand::SaveChunk { position, data }); } - }).unwrap() - }) + }, + } } pub fn receive(&self) -> Option { diff --git a/kubi/src/world/queue.rs b/kubi/src/world/queue.rs index 8f46335..100d380 100644 --- a/kubi/src/world/queue.rs +++ b/kubi/src/world/queue.rs @@ -22,6 +22,9 @@ pub fn apply_queued_blocks( if event.soft && *block != Block::Air { return false } + if event.block_type == *block { + return false + } *block = event.block_type; //mark chunk as dirty let (chunk_pos, block_pos) = ChunkStorage::to_chunk_coords(event.position);