diff --git a/.gitignore b/.gitignore index ef8c4d1..42f57f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.direnv + # Generated by Cargo # will have compiled files and executables debug/ @@ -15,6 +17,7 @@ _src _visualizer.json *.kubi +*.kbi /*_log*.txt /*.log @@ -37,4 +40,4 @@ _visualizer.json /mods -.direnv + diff --git a/Cargo.lock b/Cargo.lock index 7c6d0dd..af7ada8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,8 +1307,10 @@ dependencies = [ "bincode", "bytemuck", "fastnoise-lite", + "flume", "glam", "hashbrown 0.14.5", + "log", "nohash-hasher", "num_enum", "nz", diff --git a/Server.toml b/Server.toml index 48065e9..a1dc9c7 100644 --- a/Server.toml +++ b/Server.toml @@ -4,6 +4,7 @@ max_clients = 32 timeout_ms = 10000 [world] +file = "world.kubi" seed = 0xfeb_face_dead_cafe preheat_radius = 8 diff --git a/kubi-server/src/client.rs b/kubi-server/src/client.rs index 2b60bf6..d4d2121 100644 --- a/kubi-server/src/client.rs +++ b/kubi-server/src/client.rs @@ -50,7 +50,7 @@ pub fn sync_client_positions( }; //log movement (annoying duh) - log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); + // log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); //Apply position to server-side client let mut trans = (&mut transforms).get(message.entity_id).unwrap(); diff --git a/kubi-server/src/config.rs b/kubi-server/src/config.rs index 561403d..079e577 100644 --- a/kubi-server/src/config.rs +++ b/kubi-server/src/config.rs @@ -1,6 +1,6 @@ use shipyard::{AllStoragesView, Unique}; use serde::{Serialize, Deserialize}; -use std::{fs, net::SocketAddr}; +use std::{fs, net::SocketAddr, path::PathBuf}; #[derive(Serialize, Deserialize)] pub struct ConfigTableServer { @@ -12,6 +12,7 @@ pub struct ConfigTableServer { #[derive(Serialize, Deserialize)] pub struct ConfigTableWorld { + pub file: Option, pub seed: u64, pub preheat_radius: u32, } diff --git a/kubi-server/src/main.rs b/kubi-server/src/main.rs index 790388a..eca8753 100644 --- a/kubi-server/src/main.rs +++ b/kubi-server/src/main.rs @@ -1,5 +1,6 @@ -use shipyard::{IntoWorkload, Workload, WorkloadModificator, World}; +use shipyard::{IntoWorkload, SystemModificator, Workload, WorkloadModificator, World}; use std::{thread, time::Duration}; +use kubi_shared::fixed_timestamp::{FixedTimestamp, init_fixed_timestamp_storage}; mod util; mod config; @@ -12,10 +13,11 @@ use config::read_config; use server::{bind_server, update_server, log_server_errors}; use client::{init_client_maps, on_client_disconnect, sync_client_positions}; use auth::authenticate_players; -use world::{update_world, init_world}; +use world::{init_world, save::save_modified, update_world}; fn initialize() -> Workload { ( + init_fixed_timestamp_storage, read_config, bind_server, init_client_maps, @@ -32,7 +34,10 @@ fn update() -> Workload { update_world, sync_client_positions, on_client_disconnect, - ).into_workload() + ).into_workload(), + save_modified + .into_workload() + .make_fixed(10000, 0), ).into_sequential_workload() } diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index 841b11a..0e05b27 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -24,12 +24,13 @@ use crate::{ pub mod chunk; pub mod tasks; +pub mod save; use chunk::Chunk; use self::{ tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, - chunk::ChunkState + chunk::ChunkState, }; #[derive(Unique, Default)] @@ -106,7 +107,7 @@ fn process_chunk_requests( chunk.state = ChunkState::Loading; chunk.subscriptions.insert(message.client_id); chunk_manager.chunks.insert(chunk_position, chunk); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.run(ChunkTask::LoadChunk { position: chunk_position, seed: config.world.seed, }); @@ -249,7 +250,11 @@ fn process_block_queue( let Some(blocks) = &mut chunk.blocks else { return true }; - blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize] = item.block_type; + let block = &mut blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize]; + if item.block_type != *block { + *block = item.block_type; + chunk.data_modified = true; + } false }); if initial_len != queue.queue.len() { @@ -278,7 +283,7 @@ pub fn preheat_world( let mut chunk = Chunk::new(); chunk.state = ChunkState::Loading; chunk_manager.chunks.insert(chunk_position, chunk); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.run(ChunkTask::LoadChunk { position: chunk_position, seed: config.world.seed, }); @@ -292,7 +297,7 @@ pub fn init_world() -> Workload { init_chunk_manager_and_block_queue.before_all(preheat_world), init_chunk_task_manager.before_all(preheat_world), preheat_world, - ).into_workload() + ).into_sequential_workload() } pub fn update_world() -> Workload { diff --git a/kubi-server/src/world/chunk.rs b/kubi-server/src/world/chunk.rs index 7287d19..3fdce80 100644 --- a/kubi-server/src/world/chunk.rs +++ b/kubi-server/src/world/chunk.rs @@ -16,13 +16,16 @@ pub struct Chunk { pub state: ChunkState, pub blocks: Option, pub subscriptions: HashSet>, + pub data_modified: bool, } + impl Chunk { pub fn new() -> Self { Self { state: ChunkState::Nothing, blocks: None, subscriptions: HashSet::with_capacity_and_hasher(4, BuildNoHashHasher::default()), + data_modified: false, } } } diff --git a/kubi-server/src/world/save.rs b/kubi-server/src/world/save.rs new file mode 100644 index 0000000..9fda708 --- /dev/null +++ b/kubi-server/src/world/save.rs @@ -0,0 +1,43 @@ +use kubi_shared::data::{io_thread::IOThreadManager, open_local_save_file}; +use shipyard::{AllStoragesView, UniqueView, UniqueViewMut}; +use crate::config::ConfigTable; +use super::{ + tasks::{ChunkTask, ChunkTaskManager}, + ChunkManager, +}; + +pub fn init_save_file(storages: &AllStoragesView) -> Option { + let config = storages.borrow::>().unwrap(); + if let Some(file_path) = &config.world.file { + log::info!("Initializing save file from {:?}", file_path); + let save = open_local_save_file(&file_path).unwrap(); + Some(IOThreadManager::new(save)) + } else { + log::warn!("No save file specified, world will not be saved"); + None + } +} + +pub fn save_modified( + mut chunks: UniqueViewMut, + ctm: UniqueView, +) { + log::info!("Saving..."); + let mut amount_saved = 0; + for (position, chunk) in chunks.chunks.iter_mut() { + if chunk.data_modified { + let Some(data) = chunk.blocks.clone() else { + continue + }; + ctm.run(ChunkTask::SaveChunk { + position: *position, + data: data, + }); + chunk.data_modified = false; + amount_saved += 1; + } + } + if amount_saved > 0 { + log::info!("Queued {} chunks for saving", amount_saved); + } +} \ No newline at end of file diff --git a/kubi-server/src/world/tasks.rs b/kubi-server/src/world/tasks.rs index 40aa615..cae2d56 100644 --- a/kubi-server/src/world/tasks.rs +++ b/kubi-server/src/world/tasks.rs @@ -4,16 +4,19 @@ use glam::IVec3; use rayon::{ThreadPool, ThreadPoolBuilder}; use anyhow::Result; use kubi_shared::{ - chunk::BlockData, - worldgen::generate_world, - queue::QueuedBlock, + chunk::BlockData, data::io_thread::{IOCommand, IOResponse, IOThreadManager}, queue::QueuedBlock, worldgen::generate_world }; +use super::save::init_save_file; pub enum ChunkTask { LoadChunk { position: IVec3, seed: u64, - } + }, + SaveChunk { + position: IVec3, + data: BlockData, + }, } pub enum ChunkTaskResponse { @@ -28,33 +31,74 @@ pub enum ChunkTaskResponse { pub struct ChunkTaskManager { channel: (Sender, Receiver), pool: ThreadPool, + iota: Option, } + impl ChunkTaskManager { - pub fn new() -> Result { + pub fn new(iota: Option) -> Result { Ok(Self { channel: unbounded(), - pool: ThreadPoolBuilder::new().build()? + pool: ThreadPoolBuilder::new().build()?, + iota, }) } - pub fn spawn_task(&self, task: ChunkTask) { - let sender = self.channel.0.clone(); - self.pool.spawn(move || { - sender.send(match task { - ChunkTask::LoadChunk { position: chunk_position, seed } => { - //unwrap is fine because abort is not possible - let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap(); - ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } + + pub fn run(&self, task: ChunkTask) { + match task { + ChunkTask::LoadChunk { position: chunk_position, seed } => { + // 1. Check if the chunk exists in the save file + if let ChunkTask::LoadChunk { position, .. } = &task { + if let Some(iota) = &self.iota { + if iota.chunk_exists(*position) { + iota.send(IOCommand::LoadChunk { position: *position }); + return + } + } } - }).unwrap() - }) + + // 2. Generate the chunk if it doesn't exist + let sender = self.channel.0.clone(); + self.pool.spawn(move || { + sender.send({ + //unwrap is fine because abort is not possible + let (blocks, queue) = generate_world(chunk_position, seed, None).unwrap(); + ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } + }).unwrap() + }); + }, + ChunkTask::SaveChunk { position, data } => { + // Save the chunk to the save file + if let Some(iota) = &self.iota { + iota.send(IOCommand::SaveChunk { position, data }); + } + }, + } } + pub fn receive(&self) -> Option { - self.channel.1.try_recv().ok() + // Try to receive IO results first + // If there are none, try to receive worldgen results + self.iota.as_ref().map(|iota| { + iota.poll_single().map(|response| match response { + IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded { + chunk_position: position, + blocks: data.expect("chunk data exists in the header, but was not loaded"), + queue: Vec::with_capacity(0) + }, + _ => panic!("Unexpected response from IO thread"), + }) + }).flatten().or_else(|| { + self.channel.1.try_recv().ok() + }) } } pub fn init_chunk_task_manager( storages: AllStoragesView ) { - storages.add_unique(ChunkTaskManager::new().expect("ChunkTaskManager Init failed")); + let iota = init_save_file(&storages); + storages.add_unique( + ChunkTaskManager::new(iota) + .expect("ChunkTaskManager Init failed") + ); } diff --git a/kubi-shared/Cargo.toml b/kubi-shared/Cargo.toml index d55cbf6..1105d48 100644 --- a/kubi-shared/Cargo.toml +++ b/kubi-shared/Cargo.toml @@ -14,6 +14,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv serde_with = "3.4" bincode = "1.3" anyhow = "1.0" +flume = "0.11" fastnoise-lite = { version = "1.1", features = ["std", "f64"] } rand = { version = "0.8", default_features = false, features = ["std", "min_const_gen"] } rand_xoshiro = "0.6" @@ -23,6 +24,7 @@ bytemuck = { version = "1.14", features = ["derive"] } static_assertions = "1.1" nz = "0.4" atomic = "0.6" +log = "0.4" [features] default = [] diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 5330bf0..1274130 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -1,7 +1,8 @@ use std::{ - fs::File, mem::size_of, + fs::{File, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, + path::Path, borrow::Cow, sync::{Arc, RwLock} }; @@ -17,6 +18,8 @@ use crate::{ chunk::{CHUNK_SIZE, BlockDataRef, BlockData} }; +pub mod io_thread; + const SECTOR_SIZE: usize = CHUNK_SIZE * CHUNK_SIZE * CHUNK_SIZE * size_of::(); const RESERVED_SIZE: usize = 1048576; //~1mb (16 sectors assuming 32x32x32 world of 1byte blocks) const RESERVED_SECTOR_COUNT: usize = RESERVED_SIZE / SECTOR_SIZE; @@ -47,19 +50,19 @@ impl Default for WorldSaveDataHeader { } } +pub type SharedHeader = Arc>; + #[derive(Unique)] pub struct WorldSaveFile { pub file: File, - pub header: WorldSaveDataHeader, + pub header: SharedHeader, } -pub type SharedSaveFile = Arc>; - impl WorldSaveFile { pub fn new(file: File) -> Self { WorldSaveFile { file, - header: WorldSaveDataHeader::default() + header: Arc::new(RwLock::new(WorldSaveDataHeader::default())), } } @@ -76,7 +79,7 @@ impl WorldSaveFile { } let limit = (RESERVED_SIZE - SUBHEADER_SIZE) as u64; - self.header = bincode::deserialize_from((&self.file).take(limit))?; + *self.header.write().unwrap() = bincode::deserialize_from((&self.file).take(limit))?; Ok(()) } @@ -88,7 +91,7 @@ impl WorldSaveFile { //XXX: this can cause the header to destroy chunk data (if it's WAY too long) // read has checks against this, but write doesn't // 1mb is pretty generous tho, so it's not a *big* deal - bincode::serialize_into(&self.file, &self.header)?; + bincode::serialize_into(&self.file, &*self.header.read().unwrap())?; Ok(()) } @@ -102,19 +105,28 @@ impl WorldSaveFile { Ok(()) } - fn allocate_sector(&mut self) -> u32 { - let value = self.header.sector_count + 1; - self.header.sector_count += 1; - value - } + // fn allocate_sector(&mut self) -> u32 { + // let mut lock = self.header.write().unwrap(); + // let value = lock.sector_count + 1; + // lock.sector_count += 1; + // value + // } pub fn save_chunk(&mut self, position: IVec3, data: &BlockDataRef) -> Result<()> { + let mut header_lock = self.header.write().unwrap(); + let mut header_modified = false; - let sector = self.header.chunk_map.get(&position).copied().unwrap_or_else(|| { + let sector = header_lock.chunk_map.get(&position).copied().unwrap_or_else(|| { header_modified = true; - self.allocate_sector() + let sector = header_lock.sector_count; + header_lock.sector_count += 1; + header_lock.chunk_map.insert(position, sector); + sector + // self.allocate_sector() }); + drop(header_lock); + let offset = sector as u64 * SECTOR_SIZE as u64; const_assert_eq!(size_of::(), 1); @@ -136,11 +148,11 @@ impl WorldSaveFile { } pub fn chunk_exists(&self, position: IVec3) -> bool { - self.header.chunk_map.contains_key(&position) + self.header.read().unwrap().chunk_map.contains_key(&position) } pub fn load_chunk(&mut self, position: IVec3) -> Result> { - let Some(§or) = self.header.chunk_map.get(&position) else { + let Some(§or) = self.header.read().unwrap().chunk_map.get(&position) else { return Ok(None); }; @@ -166,4 +178,26 @@ impl WorldSaveFile { Ok(Some(data)) } + + pub fn get_shared_header(&self) -> SharedHeader { + Arc::clone(&self.header) + } } + +/// Utility function to open a local save file, creating it if it doesn't exist +pub fn open_local_save_file(path: &Path) -> Result { + let mut save_file = WorldSaveFile::new({ + OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path)? + }); + if save_file.file.metadata().unwrap().len() == 0 { + save_file.initialize()?; + } else { + save_file.load_data()?; + } + Ok(save_file) +} + diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs new file mode 100644 index 0000000..eac20fd --- /dev/null +++ b/kubi-shared/src/data/io_thread.rs @@ -0,0 +1,249 @@ +use glam::IVec3; +use flume::{Receiver, Sender, TryIter}; +use shipyard::Unique; +use crate::chunk::BlockData; +use super::{SharedHeader, WorldSaveFile}; + +// Maximum amount of chunks to save in a single batch before checking if there are any pending read requests +// may be broken, so currently disabled +const MAX_SAVE_BATCH_SIZE: usize = usize::MAX; + +pub enum IOCommand { + SaveChunk { + position: IVec3, + data: BlockData, + }, + + /// Load a chunk from the disk and send it to the main thread + LoadChunk { + position: IVec3, + }, + + /// Process all pending write commands and make the thread end itself + /// LoadChunk commands will be ignored after this command is received + Kys, +} + +#[derive(Debug)] +pub enum IOResponse { + /// A chunk has been loaded from the disk + /// Or not, in which case the data will be None + /// and chunk should be generated + ChunkLoaded { + position: IVec3, + data: Option, + }, + + /// The IO thread has been terminated + Terminated, +} + +struct IOThreadContext { + tx: Sender, + rx: Receiver, + save: WorldSaveFile, + save_queue: Vec<(IVec3, BlockData)>, +} + +//TODO: Implement proper error handling (I/O errors are rlly common) + +impl IOThreadContext { + /// Should be called ON the IO thread + /// + /// Initializes the IO thread context, opening the file at the given path + /// If there's an error opening the file, the thread will panic (TODO: handle this more gracefully) + pub fn initialize( + tx: Sender, + rx: Receiver, + save: WorldSaveFile, + ) -> Self { + // save.load_data().unwrap(); + let save_queue = Vec::new(); + Self { tx, rx, save, save_queue } + } + + pub fn run(mut self) { + loop { + // because were waiting for the next command, we can't process the save_queue + // which breaks batching, so we need to check if there are any pending save requests + // and if there are, use non-blocking recv to give them a chance to be processed + 'rx: while let Some(command) = { + if self.save_queue.len() > 0 { + self.rx.try_recv().ok() + } else { + self.rx.recv().ok() + } + } { + match command { + IOCommand::SaveChunk { position, data } => { + // if chunk already has a save request, overwrite it + for (pos, old_data) in self.save_queue.iter_mut() { + if *pos == position { + *old_data = data; + continue 'rx; + } + } + // if not, save to the queue + self.save_queue.push((position, data)); + //log::trace!("amt of unsaved chunks: {}", self.save_queue.len()); + } + IOCommand::LoadChunk { position } => { + // HOLD ON + // first check if the chunk is already in the save queue + // if it is, send it and continue + // (NOT doing this WILL result in data loss if the user returns to the chunk too quickly) + for (pos, data) in self.save_queue.iter() { + if *pos == position { + self.tx.send(IOResponse::ChunkLoaded { position, data: Some(data.clone()) }).unwrap(); + continue 'rx; + } + } + let data = self.save.load_chunk(position).unwrap(); + self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); + } + IOCommand::Kys => { + // Process all pending write commands + log::info!("info: queue has {} chunks", self.save_queue.len()); + let mut saved_amount = 0; + for (pos, data) in self.save_queue.drain(..) { + self.save.save_chunk(pos, &data).unwrap(); + saved_amount += 1; + } + log::debug!("now, moving on to the rx queue..."); + for cmd in self.rx.try_iter() { + let IOCommand::SaveChunk { position, data } = cmd else { + continue; + }; + self.save.save_chunk(position, &data).unwrap(); + saved_amount += 1; + } + log::info!("saved {} chunks on exit", saved_amount); + self.tx.send(IOResponse::Terminated).unwrap(); + return; + } + } + } + // between every betch of requests, check if there are any pending save requests + if self.save_queue.len() > 0 { + let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len()); + log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE); + for (pos, data) in self.save_queue.drain(..will_drain) { + self.save.save_chunk(pos, &data).unwrap(); + } + } + } + } +} + +pub struct IOSingleThread { + tx: Sender, + rx: Receiver, + handle: std::thread::JoinHandle<()>, + header: SharedHeader, +} + +impl IOSingleThread { + pub fn spawn(save: WorldSaveFile) -> Self { + // Create channels + let (command_tx, command_rx) = flume::unbounded(); + let (response_tx, response_rx) = flume::unbounded(); + + // Grab a handle to the header + let header = save.get_shared_header(); + + // Spawn the thread + let builder = std::thread::Builder::new() + .name("io-thread".into()); + let handle = builder.spawn(move || { + let context = IOThreadContext::initialize(response_tx, command_rx, save); + context.run(); + }).unwrap(); + + IOSingleThread { + tx: command_tx, + rx: response_rx, + handle, + header, + } + } + + /// Send a command to the IO thread + pub fn send(&self, cmd: IOCommand) { + self.tx.send(cmd).unwrap(); + } + + /// Poll the IO thread for a single response (non-blocking) + pub fn poll_single(&self) -> Option { + self.rx.try_recv().ok() + } + + /// Poll the IO thread for responses (non-blocking) + pub fn poll(&self) -> TryIter { + self.rx.try_iter() + } + + /// Signal the IO thread to process the remaining requests and wait for it to terminate + pub fn stop_sync(&self) { + log::debug!("Stopping IO thread (sync)"); + + // Tell the thread to terminate and wait for it to finish + self.tx.send(IOCommand::Kys).unwrap(); + while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} + + // HACK "we have .join at home" + while !self.handle.is_finished() {} + + log::debug!("IO thread stopped"); //almost lol + } + + /// Same as stop_sync but doesn't wait for the IO thread to terminate + pub fn stop_async(&self) { + log::debug!("Stopping IO thread (async)"); + self.tx.send(IOCommand::Kys).unwrap(); + } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.header.read().unwrap().chunk_map.contains_key(&position) + } +} + +impl Drop for IOSingleThread { + fn drop(&mut self) { + log::trace!("IOSingleThread dropped, about to sync unsaved data..."); + self.stop_sync(); + } +} + + +/// This is a stub for future implemntation that may use multiple IO threads +#[derive(Unique)] +pub struct IOThreadManager { + thread: IOSingleThread, +} + +impl IOThreadManager { + pub fn new(save: WorldSaveFile) -> Self { + Self { + thread: IOSingleThread::spawn(save) + } + } + + pub fn send(&self, cmd: IOCommand) { + self.thread.send(cmd); + } + + pub fn poll_single(&self) -> Option { + self.thread.poll_single() + } + + pub fn poll(&self) -> TryIter { + self.thread.poll() + } + + pub fn chunk_exists(&self, position: IVec3) -> bool { + self.thread.chunk_exists(position) + } +} + +// i think im a girl :3 (noone will ever read this right? :p) + diff --git a/kubi/src/fixed_timestamp.rs b/kubi-shared/src/fixed_timestamp.rs similarity index 100% rename from kubi/src/fixed_timestamp.rs rename to kubi-shared/src/fixed_timestamp.rs diff --git a/kubi-shared/src/lib.rs b/kubi-shared/src/lib.rs index f262514..0c8b4c3 100644 --- a/kubi-shared/src/lib.rs +++ b/kubi-shared/src/lib.rs @@ -8,3 +8,4 @@ pub mod entity; pub mod player; pub mod queue; pub mod data; +pub mod fixed_timestamp; diff --git a/kubi/src/init.rs b/kubi/src/init.rs index 0dfb513..fba8a51 100644 --- a/kubi/src/init.rs +++ b/kubi/src/init.rs @@ -5,33 +5,25 @@ use crate::{ networking::{GameType, ServerAddress}, state::{GameState, NextState} }; -use kubi_shared::data::WorldSaveFile; - -fn open_local_save_file(path: &Path) -> Result { - let mut save_file = WorldSaveFile::new({ - OpenOptions::new() - .read(true) - .write(true) - .open("world.kbi")? - }); - if save_file.file.metadata().unwrap().len() == 0 { - save_file.initialize()?; - } else { - save_file.load_data()?; - } - Ok(save_file) -} +use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile, open_local_save_file}; pub fn initialize_from_args( all_storages: AllStoragesView, ) { + // If an address is provided, we're in multiplayer mode (the first argument is the address) + // Otherwise, we're in singleplayer mode and working with local stuff let args: Vec = env::args().collect(); if args.len() > 1 { + // Parse the address and switch the state to connecting let address = args[1].parse::().expect("invalid address"); all_storages.add_unique(GameType::Muliplayer); all_storages.add_unique(ServerAddress(address)); all_storages.borrow::>().unwrap().0 = Some(GameState::Connecting); } else { + // Open the local save file + let save_file = open_local_save_file(Path::new("./world.kubi")).expect("failed to open save file"); + all_storages.add_unique(IOThreadManager::new(save_file)); + // Switch the state and kick off the world loading all_storages.add_unique(GameType::Singleplayer); all_storages.borrow::>().unwrap().0 = Some(GameState::LoadingWorld); } diff --git a/kubi/src/lib.rs b/kubi/src/lib.rs index 1bcd7cc..27c7074 100644 --- a/kubi/src/lib.rs +++ b/kubi/src/lib.rs @@ -23,7 +23,9 @@ use winit::{ use glam::vec3; use std::time::Instant; +//TODO remove these re-exports pub(crate) use kubi_shared::transform; +pub(crate) use kubi_shared::fixed_timestamp; mod ui; pub(crate) use ui::{ @@ -51,17 +53,12 @@ pub(crate) mod hui_integration; pub(crate) mod networking; pub(crate) mod init; pub(crate) mod color; -pub(crate) mod fixed_timestamp; pub(crate) mod filesystem; pub(crate) mod client_physics; pub(crate) mod chat; use world::{ - init_game_world, - loading::update_loaded_world_around_player, - raycast::update_raycasts, - queue::apply_queued_blocks, - tasks::ChunkTaskManager, + init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager }; use player::{spawn_player, MainPlayer}; use prefabs::load_prefabs; @@ -157,7 +154,6 @@ fn update() -> Workload { kubi_ui_end, update_state, exit_on_esc, - disconnect_on_exit.run_if(is_multiplayer), update_rendering_late, ).into_sequential_workload() } @@ -183,6 +179,13 @@ fn after_render() -> Workload { ).into_sequential_workload() } +fn on_exit() -> Workload{ + ( + disconnect_on_exit.run_if(is_multiplayer), + save_on_exit.run_if(is_singleplayer), + ).into_sequential_workload().run_if(is_ingame_or_loading) +} + #[cfg(all(windows, not(debug_assertions)))] fn attach_console() { use winapi::um::wincon::{AttachConsole, ATTACH_PARENT_PROCESS}; @@ -243,6 +246,7 @@ pub fn kubi_main( world.add_workload(update); //world.add_workload(render); world.add_workload(after_render); + world.add_workload(on_exit); //Save _visualizer.json #[cfg(feature = "generate_visualizer_data")] { @@ -350,6 +354,11 @@ pub fn kubi_main( window_target.exit(); } }, + + Event::LoopExiting => { + world.run_workload(on_exit).unwrap(); + }, + _ => (), }; }).unwrap(); diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 56f5fde..e9e5b5e 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -159,19 +159,15 @@ pub fn update_networking_late() -> Workload { } pub fn disconnect_on_exit( - exit: UniqueView, mut client: UniqueViewMut, ) { - //TODO check if this works - if exit.0 { - if client.0.is_active() { - client.0.flush(); - client.0.disconnect(); - while client.0.is_active() { client.0.step().for_each(|_|()); } - log::info!("Client disconnected"); - } else { - log::info!("Client inactive") - } + if client.0.is_active() { + client.0.flush(); + client.0.disconnect(); + while client.0.is_active() { client.0.step().for_each(|_|()); } + log::info!("Client disconnected"); + } else { + log::info!("Client inactive") } } diff --git a/kubi/src/networking/world.rs b/kubi/src/networking/world.rs index efb2cde..b7feb2a 100644 --- a/kubi/src/networking/world.rs +++ b/kubi/src/networking/world.rs @@ -37,7 +37,7 @@ pub fn inject_network_responses_into_manager_queue( let ServerToClientMessage::ChunkResponse { chunk, data, queued } = packet else { unreachable!() }; - manager.add_sussy_response(ChunkTaskResponse::LoadedChunk { + manager.add_sussy_response(ChunkTaskResponse::ChunkWorldgenDone { position: chunk, chunk_data: data, queued diff --git a/kubi/src/world.rs b/kubi/src/world.rs index 1626d17..fe1f8ff 100644 --- a/kubi/src/world.rs +++ b/kubi/src/world.rs @@ -62,10 +62,10 @@ impl ChunkStorage { } } -#[derive(Unique)] -pub struct WorldInfo { - pub seed: u32, -} +// #[derive(Unique)] +// pub struct WorldInfo { +// pub seed: u32, +// } #[derive(Default, Unique)] pub struct ChunkMeshStorage { diff --git a/kubi/src/world/chunk.rs b/kubi/src/world/chunk.rs index 27c52a9..f728f28 100644 --- a/kubi/src/world/chunk.rs +++ b/kubi/src/world/chunk.rs @@ -57,6 +57,7 @@ pub struct Chunk { pub desired_state: DesiredChunkState, pub abortion: Option>>, pub mesh_dirty: bool, + pub data_modified: bool, } impl Chunk { @@ -69,6 +70,7 @@ impl Chunk { desired_state: Default::default(), abortion: None, mesh_dirty: false, + data_modified: false, } } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 7ee4313..de3898d 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -1,14 +1,18 @@ use std::sync::Arc; use atomic::{Atomic, Ordering}; use glam::{IVec3, ivec3}; -use kubi_shared::{networking::{channels::Channel, messages::ClientToServerMessage}, worldgen::AbortState}; +use kubi_shared::{ + data::io_thread::{IOCommand, IOResponse, IOThreadManager}, + networking::{channels::Channel, messages::ClientToServerMessage}, + worldgen::AbortState, +}; use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track}; use uflow::SendMode; use wgpu::util::DeviceExt; use crate::{ networking::UdpClient, player::MainPlayer, - rendering::{world::ChunkVertex, BufferPair, Renderer}, + rendering::{BufferPair, Renderer}, settings::GameSettings, state::GameState, transform::Transform, @@ -16,10 +20,12 @@ use crate::{ use super::{ ChunkStorage, ChunkMeshStorage, chunk::{Chunk, DesiredChunkState, CHUNK_SIZE, ChunkMesh, CurrentChunkState, ChunkData}, - tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, + tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, queue::BlockUpdateQueue, }; +const WORLD_SEED: u64 = 0xfeb_face_dead_cafe; + const MAX_CHUNK_OPS_INGAME: usize = 8; const MAX_CHUNK_OPS: usize = 32; @@ -92,6 +98,7 @@ pub fn update_chunks_if_player_moved( fn process_state_changes( task_manager: UniqueView, + io: Option>, mut udp_client: Option>, mut world: UniqueViewMut, mut vm_meshes: NonSendSync>, @@ -135,7 +142,7 @@ fn process_state_changes( chunk.current_state, CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh, ) => { - chunk.block_data = None; + // chunk.block_data = None; //HACK when downgrading, keep the data so we can save it chunk.current_state = CurrentChunkState::Nothing; }, @@ -184,18 +191,38 @@ fn process_state_changes( SendMode::Reliable ); } else { - let atomic = Arc::new(Atomic::new(AbortState::Continue)); - task_manager.spawn_task(ChunkTask::LoadChunk { - seed: 0xbeef_face_dead_cafe, - position, - abortion: Some(Arc::clone(&atomic)), - }); - abortion = Some(atomic); + + // If the chunk exists in the save file (and save file is there in the first place), + // ... we'll try to load it + // Otherwise, we'll run worldgen + + let mut should_run_worldgen = true; + + if let Some(io) = &io { + if io.chunk_exists(position) { + // Try to load the chunk from the save file + // In case that fails, we will run worldgen once the IO thread responds + io.send(IOCommand::LoadChunk { position }); + should_run_worldgen = false; + } + } + + if should_run_worldgen { + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + abortion = Some(atomic); + } } + //Update chunk state let chunk = world.chunks.get_mut(&position).unwrap(); chunk.current_state = CurrentChunkState::Loading; chunk.abortion = abortion; + // =========== //log::trace!("Started loading chunk {position}"); }, @@ -254,7 +281,29 @@ fn process_state_changes( return false } - //HACK, since save files are not implemented, just unload immediately + // If in singleplayer and have an open save file, we need to save the chunk to the disk + + // ========================================================== + //TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED + // OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY + // ========================================================== + //XXX: CHECK IF WE REALLY NEED THIS OR IF WE CAN JUST KILL THE CHUNK RIGHT AWAY + //CHANGES TO CHUNK SAVING LOGIC SHOULD HAVE MADE THE ABOVE COMMENT OBSOLETE + + if let Some(io) = &io { + if let Some(block_data) = &chunk.block_data { + // Only save the chunk if it has been modified + if chunk.data_modified { + // log::debug!("issue save command"); + chunk.data_modified = false; + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + } + return false } true @@ -264,6 +313,7 @@ fn process_state_changes( fn process_completed_tasks( task_manager: UniqueView, + io: Option>, mut world: UniqueViewMut, mut meshes: NonSendSync>, renderer: UniqueView, @@ -271,9 +321,69 @@ fn process_completed_tasks( mut queue: UniqueViewMut, ) { let mut ops: usize = 0; - while let Some(res) = task_manager.receive() { + + //TODO reduce code duplication between loaded/generated chunks + + // Process IO first + if let Some(io) = &io { + for response in io.poll() { + let IOResponse::ChunkLoaded { position, data } = response else { + //TODO this is bad + panic!("Unexpected IO response: {:?}", response); + }; + + //check if chunk exists + let Some(chunk) = world.chunks.get_mut(&position) else { + log::warn!("LOADED blocks data discarded: chunk doesn't exist"); + continue + }; + + //we cannot have abortion here but just in case, reset it + chunk.abortion = None; + + //check if chunk still wants it + if !matches!(chunk.desired_state, DesiredChunkState::Loaded | DesiredChunkState::Rendered) { + log::warn!("LOADED block data discarded: state undesirable: {:?}", chunk.desired_state); + continue + } + + // check if we actually got the data + if let Some(data) = data { + // If we did get the data, yay :3 + chunk.block_data = Some(ChunkData { + blocks: data + }); + chunk.current_state = CurrentChunkState::Loaded; + } else { + // If we didn't get the data, we need to run worldgen + // XXX: will this ever happen? we should always have the data in the save file + let atomic = Arc::new(Atomic::new(AbortState::Continue)); + task_manager.spawn_task(ChunkTask::ChunkWorldgen { + seed: WORLD_SEED, + position, + abortion: Some(Arc::clone(&atomic)), + }); + chunk.abortion = Some(atomic); + } + + ops += 1; + } + + //return early if we've reached the limit + if ops >= match *state { + GameState::InGame => MAX_CHUNK_OPS_INGAME, + _ => MAX_CHUNK_OPS, + } { return } + // XXX: this will completely skip polling the task manager if we've reached the limit + // this is probably fine, but it might be worth revisiting later + } + + for res in task_manager.poll() { match res { - ChunkTaskResponse::LoadedChunk { position, chunk_data, mut queued } => { + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { + //TODO this can fuck shit up really badly if io op gets overwritten by worldgen chunk + //TODO only accept if loading stage, not loaded + //If unwanted chunk is already loaded //It would be ~~...unethical~~ impossible to abort the operation at this point //Instead, we'll just throw it away @@ -308,7 +418,7 @@ fn process_completed_tasks( //increase ops counter ops += 1; }, - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, @@ -392,3 +502,20 @@ fn process_completed_tasks( } { break } } } + +/// Save all modified chunks to the disk +pub fn save_on_exit( + io: UniqueView, + world: UniqueView, +) { + for (&position, chunk) in &world.chunks { + if let Some(block_data) = &chunk.block_data { + if chunk.data_modified { + io.send(IOCommand::SaveChunk { + position, + data: block_data.blocks.clone(), + }); + } + } + } +} diff --git a/kubi/src/world/queue.rs b/kubi/src/world/queue.rs index 76d6b02..100d380 100644 --- a/kubi/src/world/queue.rs +++ b/kubi/src/world/queue.rs @@ -22,11 +22,15 @@ pub fn apply_queued_blocks( if event.soft && *block != Block::Air { return false } + if event.block_type == *block { + return false + } *block = event.block_type; //mark chunk as dirty let (chunk_pos, block_pos) = ChunkStorage::to_chunk_coords(event.position); let chunk = world.chunks.get_mut(&chunk_pos).expect("This error should never happen, if it does then something is super fucked up and the whole project needs to be burnt down."); chunk.mesh_dirty = true; + chunk.data_modified = true; //If block pos is close to the border, some neighbors may be dirty! const DIRECTIONS: [IVec3; 6] = [ ivec3(1, 0, 0), diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index a481f5f..eebc663 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -1,6 +1,6 @@ use std::sync::Arc; use atomic::Atomic; -use flume::{Sender, Receiver}; +use flume::{Receiver, Sender, TryIter}; use glam::IVec3; use kubi_shared::{queue::QueuedBlock, worldgen::AbortState}; use shipyard::Unique; @@ -13,7 +13,7 @@ use super::{ use crate::rendering::world::ChunkVertex; pub enum ChunkTask { - LoadChunk { + ChunkWorldgen { seed: u64, position: IVec3, abortion: Option>>, @@ -23,13 +23,14 @@ pub enum ChunkTask { data: MeshGenData } } + pub enum ChunkTaskResponse { - LoadedChunk { + ChunkWorldgenDone { position: IVec3, chunk_data: BlockData, queued: Vec }, - GeneratedMesh { + GenerateMeshDone { position: IVec3, vertices: Vec, indices: Vec, @@ -43,6 +44,7 @@ pub struct ChunkTaskManager { channel: (Sender, Receiver), pool: ThreadPool, } + impl ChunkTaskManager { pub fn new() -> Self { Self { @@ -50,11 +52,17 @@ impl ChunkTaskManager { pool: ThreadPoolBuilder::new().num_threads(4).build().unwrap() } } + + //TODO get rid of add_sussy_response + + /// Add a response to the queue, to be picked up by the main thread + /// Used by the multiplayer netcode, a huge hack pub fn add_sussy_response(&self, response: ChunkTaskResponse) { // this WILL get stuck if the channel is bounded // don't make the channel bounded ever self.channel.0.send(response).unwrap() } + pub fn spawn_task(&self, task: ChunkTask) { let sender = self.channel.0.clone(); self.pool.spawn(move || { @@ -64,23 +72,29 @@ impl ChunkTaskManager { (vertices, indices), (trans_vertices, trans_indices), ) = generate_mesh(position, data); - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, } }, - ChunkTask::LoadChunk { position, seed, abortion } => { + ChunkTask::ChunkWorldgen { position, seed, abortion } => { let Some((chunk_data, queued)) = generate_world(position, seed, abortion) else { log::warn!("aborted operation"); return }; - ChunkTaskResponse::LoadedChunk { position, chunk_data, queued } + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, queued } } }); }); } + + #[deprecated(note = "use poll instead")] pub fn receive(&self) -> Option { self.channel.1.try_recv().ok() } + + pub fn poll(&self) -> TryIter { + self.channel.1.try_iter() + } }