diff --git a/.gitignore b/.gitignore index ef8c4d1..42f57f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.direnv + # Generated by Cargo # will have compiled files and executables debug/ @@ -15,6 +17,7 @@ _src _visualizer.json *.kubi +*.kbi /*_log*.txt /*.log @@ -37,4 +40,4 @@ _visualizer.json /mods -.direnv + diff --git a/Cargo.lock b/Cargo.lock index 7c6d0dd..af7ada8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,8 +1307,10 @@ dependencies = [ "bincode", "bytemuck", "fastnoise-lite", + "flume", "glam", "hashbrown 0.14.5", + "log", "nohash-hasher", "num_enum", "nz", diff --git a/kubi-shared/Cargo.toml b/kubi-shared/Cargo.toml index d55cbf6..1105d48 100644 --- a/kubi-shared/Cargo.toml +++ b/kubi-shared/Cargo.toml @@ -14,6 +14,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv serde_with = "3.4" bincode = "1.3" anyhow = "1.0" +flume = "0.11" fastnoise-lite = { version = "1.1", features = ["std", "f64"] } rand = { version = "0.8", default_features = false, features = ["std", "min_const_gen"] } rand_xoshiro = "0.6" @@ -23,6 +24,7 @@ bytemuck = { version = "1.14", features = ["derive"] } static_assertions = "1.1" nz = "0.4" atomic = "0.6" +log = "0.4" [features] default = [] diff --git a/kubi-shared/src/data.rs b/kubi-shared/src/data.rs index 5330bf0..da113fe 100644 --- a/kubi-shared/src/data.rs +++ b/kubi-shared/src/data.rs @@ -17,6 +17,8 @@ use crate::{ chunk::{CHUNK_SIZE, BlockDataRef, BlockData} }; +pub mod io_thread; + const SECTOR_SIZE: usize = CHUNK_SIZE * CHUNK_SIZE * CHUNK_SIZE * size_of::(); const RESERVED_SIZE: usize = 1048576; //~1mb (16 sectors assuming 32x32x32 world of 1byte blocks) const RESERVED_SECTOR_COUNT: usize = RESERVED_SIZE / SECTOR_SIZE; diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs new file mode 100644 index 0000000..2386d40 --- /dev/null +++ b/kubi-shared/src/data/io_thread.rs @@ -0,0 +1,166 @@ +use glam::IVec3; +use flume::{Receiver, Sender, TryIter}; +use shipyard::Unique; +use crate::chunk::BlockData; + +use super::WorldSaveFile; + +pub enum IOCommand { + SaveChunk { + position: IVec3, + data: BlockData, + }, + + /// Load a chunk from the disk and send it to the main thread + LoadChunk { + position: IVec3, + }, + + /// Process all pending write commands and make the thread end itself + /// LoadChunk commands will be ignored after this command is received + Kys, +} + + +pub enum IOResponse { + /// A chunk has been loaded from the disk + /// Or not, in which case the data will be None + /// and chunk should be generated + ChunkLoaded { + position: IVec3, + data: Option, + }, + + /// The IO thread has been terminated + Terminated, +} + +struct IOThreadContext { + tx: Sender, + rx: Receiver, + save: WorldSaveFile, +} + +//TODO: Implement proper error handling (I/O errors are rlly common) + +impl IOThreadContext { + /// Should be called ON the IO thread + /// + /// Initializes the IO thread context, opening the file at the given path + /// If there's an error opening the file, the thread will panic (TODO: handle this more gracefully) + pub fn initialize( + tx: Sender, + rx: Receiver, + save: WorldSaveFile, + ) -> Self { + // save.load_data().unwrap(); + Self { tx, rx, save } + } + + pub fn run(mut self) { + loop { + match self.rx.recv().unwrap() { + IOCommand::SaveChunk { position, data } => { + self.save.save_chunk(position, &data).unwrap(); + } + IOCommand::LoadChunk { position } => { + let data = self.save.load_chunk(position).unwrap(); + self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); + } + IOCommand::Kys => { + // Process all pending write commands + while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() { + self.save.save_chunk(position, &data).unwrap(); + } + self.tx.send(IOResponse::Terminated).unwrap(); + return; + } + } + } + } +} + +pub struct IOSingleThread { + tx: Sender, + rx: Receiver, + handle: std::thread::JoinHandle<()>, +} + +impl IOSingleThread { + pub fn spawn(save: WorldSaveFile) -> Self { + // Create channels + let (command_tx, command_rx) = flume::unbounded(); + let (response_tx, response_rx) = flume::unbounded(); + + // Spawn the thread + let builder = std::thread::Builder::new() + .name("io-thread".into()); + let handle = builder.spawn(move || { + let context = IOThreadContext::initialize(response_tx, command_rx, save); + context.run(); + }).unwrap(); + + IOSingleThread { + tx: command_tx, + rx: response_rx, + handle + } + } + + /// Send a command to the IO thread + pub fn send(&self, cmd: IOCommand) { + self.tx.send(cmd).unwrap(); + } + + /// Poll the IO thread for responses (non-blocking) + pub fn poll(&self) -> TryIter { + self.rx.try_iter() + } + + /// Signal the IO thread to process the remaining requests and wait for it to terminate + pub fn stop_sync(&self) { + // Tell the thread to terminate and wait for it to finish + self.tx.send(IOCommand::Kys).unwrap(); + while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} + + // HACK "we have .join at home" + while !self.handle.is_finished() {} + } + + /// Same as stop_sync but doesn't wait for the IO thread to terminate + pub fn stop_async(&self) { + self.tx.send(IOCommand::Kys).unwrap(); + } +} + +impl Drop for IOSingleThread { + fn drop(&mut self) { + self.stop_sync(); + } +} + + +/// This is a stub for future implemntation that may use multiple IO threads +#[derive(Unique)] +pub struct IOThreadManager { + thread: IOSingleThread, +} + +impl IOThreadManager { + pub fn new(save: WorldSaveFile) -> Self { + Self { + thread: IOSingleThread::spawn(save) + } + } + + pub fn send(&self, cmd: IOCommand) { + self.thread.send(cmd); + } + + pub fn poll(&self) -> TryIter { + self.thread.poll() + } +} + +// i think im a girl :3 (noone will ever read this right? :p) + diff --git a/kubi/src/init.rs b/kubi/src/init.rs index 0dfb513..009e9c4 100644 --- a/kubi/src/init.rs +++ b/kubi/src/init.rs @@ -5,14 +5,15 @@ use crate::{ networking::{GameType, ServerAddress}, state::{GameState, NextState} }; -use kubi_shared::data::WorldSaveFile; +use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile}; fn open_local_save_file(path: &Path) -> Result { let mut save_file = WorldSaveFile::new({ OpenOptions::new() .read(true) .write(true) - .open("world.kbi")? + .create(true) + .open(path)? }); if save_file.file.metadata().unwrap().len() == 0 { save_file.initialize()?; @@ -25,13 +26,20 @@ fn open_local_save_file(path: &Path) -> Result { pub fn initialize_from_args( all_storages: AllStoragesView, ) { + // If an address is provided, we're in multiplayer mode (the first argument is the address) + // Otherwise, we're in singleplayer mode and working with local stuff let args: Vec = env::args().collect(); if args.len() > 1 { + // Parse the address and switch the state to connecting let address = args[1].parse::().expect("invalid address"); all_storages.add_unique(GameType::Muliplayer); all_storages.add_unique(ServerAddress(address)); all_storages.borrow::>().unwrap().0 = Some(GameState::Connecting); } else { + // Open the local save file + let save_file = open_local_save_file(Path::new("./world.kubi")).expect("failed to open save file"); + all_storages.add_unique(IOThreadManager::new(save_file)); + // Switch the state and kick off the world loading all_storages.add_unique(GameType::Singleplayer); all_storages.borrow::>().unwrap().0 = Some(GameState::LoadingWorld); } diff --git a/kubi/src/networking/world.rs b/kubi/src/networking/world.rs index efb2cde..b7feb2a 100644 --- a/kubi/src/networking/world.rs +++ b/kubi/src/networking/world.rs @@ -37,7 +37,7 @@ pub fn inject_network_responses_into_manager_queue( let ServerToClientMessage::ChunkResponse { chunk, data, queued } = packet else { unreachable!() }; - manager.add_sussy_response(ChunkTaskResponse::LoadedChunk { + manager.add_sussy_response(ChunkTaskResponse::ChunkWorldgenDone { position: chunk, chunk_data: data, queued diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index 7ee4313..9880730 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -8,7 +8,7 @@ use wgpu::util::DeviceExt; use crate::{ networking::UdpClient, player::MainPlayer, - rendering::{world::ChunkVertex, BufferPair, Renderer}, + rendering::{BufferPair, Renderer}, settings::GameSettings, state::GameState, transform::Transform, @@ -16,7 +16,7 @@ use crate::{ use super::{ ChunkStorage, ChunkMeshStorage, chunk::{Chunk, DesiredChunkState, CHUNK_SIZE, ChunkMesh, CurrentChunkState, ChunkData}, - tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, + tasks::{ChunkTaskManager, ChunkTaskResponse, ChunkTask}, queue::BlockUpdateQueue, }; @@ -185,7 +185,7 @@ fn process_state_changes( ); } else { let atomic = Arc::new(Atomic::new(AbortState::Continue)); - task_manager.spawn_task(ChunkTask::LoadChunk { + task_manager.spawn_task(ChunkTask::ChunkWorldgen { seed: 0xbeef_face_dead_cafe, position, abortion: Some(Arc::clone(&atomic)), @@ -273,7 +273,7 @@ fn process_completed_tasks( let mut ops: usize = 0; while let Some(res) = task_manager.receive() { match res { - ChunkTaskResponse::LoadedChunk { position, chunk_data, mut queued } => { + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { //If unwanted chunk is already loaded //It would be ~~...unethical~~ impossible to abort the operation at this point //Instead, we'll just throw it away @@ -308,7 +308,7 @@ fn process_completed_tasks( //increase ops counter ops += 1; }, - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index a481f5f..90e1de8 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -13,7 +13,7 @@ use super::{ use crate::rendering::world::ChunkVertex; pub enum ChunkTask { - LoadChunk { + ChunkWorldgen { seed: u64, position: IVec3, abortion: Option>>, @@ -23,13 +23,14 @@ pub enum ChunkTask { data: MeshGenData } } + pub enum ChunkTaskResponse { - LoadedChunk { + ChunkWorldgenDone { position: IVec3, chunk_data: BlockData, queued: Vec }, - GeneratedMesh { + GenerateMeshDone { position: IVec3, vertices: Vec, indices: Vec, @@ -43,6 +44,7 @@ pub struct ChunkTaskManager { channel: (Sender, Receiver), pool: ThreadPool, } + impl ChunkTaskManager { pub fn new() -> Self { Self { @@ -50,11 +52,17 @@ impl ChunkTaskManager { pool: ThreadPoolBuilder::new().num_threads(4).build().unwrap() } } + + //TODO get rid of add_sussy_response + + /// Add a response to the queue, to be picked up by the main thread + /// Used by the multiplayer netcode, a huge hack pub fn add_sussy_response(&self, response: ChunkTaskResponse) { // this WILL get stuck if the channel is bounded // don't make the channel bounded ever self.channel.0.send(response).unwrap() } + pub fn spawn_task(&self, task: ChunkTask) { let sender = self.channel.0.clone(); self.pool.spawn(move || { @@ -64,22 +72,23 @@ impl ChunkTaskManager { (vertices, indices), (trans_vertices, trans_indices), ) = generate_mesh(position, data); - ChunkTaskResponse::GeneratedMesh { + ChunkTaskResponse::GenerateMeshDone { position, vertices, indices, trans_vertices, trans_indices, } }, - ChunkTask::LoadChunk { position, seed, abortion } => { + ChunkTask::ChunkWorldgen { position, seed, abortion } => { let Some((chunk_data, queued)) = generate_world(position, seed, abortion) else { log::warn!("aborted operation"); return }; - ChunkTaskResponse::LoadedChunk { position, chunk_data, queued } + ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, queued } } }); }); } + pub fn receive(&self) -> Option { self.channel.1.try_recv().ok() }