do partial impl

This commit is contained in:
griffi-gh 2024-09-02 00:22:54 +02:00
parent 2466c02937
commit 570382520c
3 changed files with 125 additions and 14 deletions

View file

@ -21,7 +21,7 @@ pub enum IOCommand {
Kys, Kys,
} }
#[derive(Debug)]
pub enum IOResponse { pub enum IOResponse {
/// A chunk has been loaded from the disk /// A chunk has been loaded from the disk
/// Or not, in which case the data will be None /// Or not, in which case the data will be None
@ -69,7 +69,10 @@ impl IOThreadContext {
} }
IOCommand::Kys => { IOCommand::Kys => {
// Process all pending write commands // Process all pending write commands
while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() { for cmd in self.rx.try_iter() {
let IOCommand::SaveChunk { position, data } = cmd else {
continue;
};
self.save.save_chunk(position, &data).unwrap(); self.save.save_chunk(position, &data).unwrap();
} }
self.tx.send(IOResponse::Terminated).unwrap(); self.tx.send(IOResponse::Terminated).unwrap();
@ -119,22 +122,28 @@ impl IOSingleThread {
/// Signal the IO thread to process the remaining requests and wait for it to terminate /// Signal the IO thread to process the remaining requests and wait for it to terminate
pub fn stop_sync(&self) { pub fn stop_sync(&self) {
log::debug!("Stopping IO thread (sync)");
// Tell the thread to terminate and wait for it to finish // Tell the thread to terminate and wait for it to finish
self.tx.send(IOCommand::Kys).unwrap(); self.tx.send(IOCommand::Kys).unwrap();
while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {}
// HACK "we have .join at home" // HACK "we have .join at home"
while !self.handle.is_finished() {} while !self.handle.is_finished() {}
log::debug!("IO thread stopped"); //almost lol
} }
/// Same as stop_sync but doesn't wait for the IO thread to terminate /// Same as stop_sync but doesn't wait for the IO thread to terminate
pub fn stop_async(&self) { pub fn stop_async(&self) {
log::debug!("Stopping IO thread (async)");
self.tx.send(IOCommand::Kys).unwrap(); self.tx.send(IOCommand::Kys).unwrap();
} }
} }
impl Drop for IOSingleThread { impl Drop for IOSingleThread {
fn drop(&mut self) { fn drop(&mut self) {
log::trace!("IOSingleThread dropped, about to sync unsaved data...");
self.stop_sync(); self.stop_sync();
} }
} }

View file

@ -1,7 +1,11 @@
use std::sync::Arc; use std::sync::Arc;
use atomic::{Atomic, Ordering}; use atomic::{Atomic, Ordering};
use glam::{IVec3, ivec3}; use glam::{IVec3, ivec3};
use kubi_shared::{networking::{channels::Channel, messages::ClientToServerMessage}, worldgen::AbortState}; use kubi_shared::{
data::io_thread::{IOCommand, IOResponse, IOThreadManager},
networking::{channels::Channel, messages::ClientToServerMessage},
worldgen::AbortState,
};
use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track}; use shipyard::{View, UniqueView, UniqueViewMut, IntoIter, Workload, IntoWorkload, NonSendSync, track};
use uflow::SendMode; use uflow::SendMode;
use wgpu::util::DeviceExt; use wgpu::util::DeviceExt;
@ -20,6 +24,8 @@ use super::{
queue::BlockUpdateQueue, queue::BlockUpdateQueue,
}; };
const WORLD_SEED: u64 = 0xbeef_face_dead_cafe;
const MAX_CHUNK_OPS_INGAME: usize = 8; const MAX_CHUNK_OPS_INGAME: usize = 8;
const MAX_CHUNK_OPS: usize = 32; const MAX_CHUNK_OPS: usize = 32;
@ -92,6 +98,7 @@ pub fn update_chunks_if_player_moved(
fn process_state_changes( fn process_state_changes(
task_manager: UniqueView<ChunkTaskManager>, task_manager: UniqueView<ChunkTaskManager>,
io: Option<UniqueView<IOThreadManager>>,
mut udp_client: Option<UniqueViewMut<UdpClient>>, mut udp_client: Option<UniqueViewMut<UdpClient>>,
mut world: UniqueViewMut<ChunkStorage>, mut world: UniqueViewMut<ChunkStorage>,
mut vm_meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>, mut vm_meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>,
@ -135,7 +142,7 @@ fn process_state_changes(
chunk.current_state, chunk.current_state,
CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh, CurrentChunkState::Loaded | CurrentChunkState::CalculatingMesh,
) => { ) => {
chunk.block_data = None; // chunk.block_data = None; //HACK when downgrading, keep the data so we can save it
chunk.current_state = CurrentChunkState::Nothing; chunk.current_state = CurrentChunkState::Nothing;
}, },
@ -184,18 +191,33 @@ fn process_state_changes(
SendMode::Reliable SendMode::Reliable
); );
} else { } else {
let atomic = Arc::new(Atomic::new(AbortState::Continue));
task_manager.spawn_task(ChunkTask::ChunkWorldgen { // ============================================================
seed: 0xbeef_face_dead_cafe, // TODO IMPORTANT: DO NOT WAIT FOR THE IO THREAD TO RESPOND, THIS WILL CAUSE A TERRIBLE BOTTLENECK
position, // find a way to check the world save header from the main thread instead!
abortion: Some(Arc::clone(&atomic)), // ============================================================
});
abortion = Some(atomic); if let Some(io) = &io {
// Try to load the chunk from the save file
// In case that fails, we will run worldgen once the IO thread responds
io.send(IOCommand::LoadChunk { position });
} else {
// If there's no IO thread, we'll just run worldgen right away
let atomic = Arc::new(Atomic::new(AbortState::Continue));
task_manager.spawn_task(ChunkTask::ChunkWorldgen {
seed: WORLD_SEED,
position,
abortion: Some(Arc::clone(&atomic)),
});
abortion = Some(atomic);
}
} }
//Update chunk state //Update chunk state
let chunk = world.chunks.get_mut(&position).unwrap(); let chunk = world.chunks.get_mut(&position).unwrap();
chunk.current_state = CurrentChunkState::Loading; chunk.current_state = CurrentChunkState::Loading;
chunk.abortion = abortion; chunk.abortion = abortion;
// =========== // ===========
//log::trace!("Started loading chunk {position}"); //log::trace!("Started loading chunk {position}");
}, },
@ -254,7 +276,22 @@ fn process_state_changes(
return false return false
} }
//HACK, since save files are not implemented, just unload immediately // If in singleplayer and have an open save file, we need to save the chunk to the disk
// ==========================================================
//TODO IMPORTANT: WAIT FOR CHUNK TO FINISH SAVING FIRST BEFORE TRANSITIONING TO UNLOADED
// OTHERWISE WE WILL LOSE THE SAVE DATA IF THE USER COMES BACK TO THE CHUNK TOO QUICKLY
// ==========================================================
if let Some(io) = &io {
if let Some(block_data) = &chunk.block_data {
// log::debug!("issue save command");
io.send(IOCommand::SaveChunk {
position,
data: block_data.blocks.clone(),
});
}
}
return false return false
} }
true true
@ -264,6 +301,7 @@ fn process_state_changes(
fn process_completed_tasks( fn process_completed_tasks(
task_manager: UniqueView<ChunkTaskManager>, task_manager: UniqueView<ChunkTaskManager>,
io: Option<UniqueView<IOThreadManager>>,
mut world: UniqueViewMut<ChunkStorage>, mut world: UniqueViewMut<ChunkStorage>,
mut meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>, mut meshes: NonSendSync<UniqueViewMut<ChunkMeshStorage>>,
renderer: UniqueView<Renderer>, renderer: UniqueView<Renderer>,
@ -271,9 +309,68 @@ fn process_completed_tasks(
mut queue: UniqueViewMut<BlockUpdateQueue>, mut queue: UniqueViewMut<BlockUpdateQueue>,
) { ) {
let mut ops: usize = 0; let mut ops: usize = 0;
while let Some(res) = task_manager.receive() {
//TODO reduce code duplication between loaded/generated chunks
// Process IO first
if let Some(io) = &io {
for response in io.poll() {
let IOResponse::ChunkLoaded { position, data } = response else {
//TODO this is bad
panic!("Unexpected IO response: {:?}", response);
};
//check if chunk exists
let Some(chunk) = world.chunks.get_mut(&position) else {
log::warn!("LOADED blocks data discarded: chunk doesn't exist");
continue
};
//we cannot have abortion here but just in case, reset it
chunk.abortion = None;
//check if chunk still wants it
if !matches!(chunk.desired_state, DesiredChunkState::Loaded | DesiredChunkState::Rendered) {
log::warn!("LOADED block data discarded: state undesirable: {:?}", chunk.desired_state);
continue
}
// check if we actually got the data
if let Some(data) = data {
// If we did get the data, yay :3
chunk.block_data = Some(ChunkData {
blocks: data
});
chunk.current_state = CurrentChunkState::Loaded;
ops += 1;
} else {
// If we didn't get the data, we need to run worldgen
//TODO: this is a terrible bottleneck
let atomic = Arc::new(Atomic::new(AbortState::Continue));
task_manager.spawn_task(ChunkTask::ChunkWorldgen {
seed: WORLD_SEED,
position,
abortion: Some(Arc::clone(&atomic)),
});
chunk.abortion = Some(atomic);
}
}
//return early if we've reached the limit
if ops >= match *state {
GameState::InGame => MAX_CHUNK_OPS_INGAME,
_ => MAX_CHUNK_OPS,
} { return }
// XXX: this will completely skip polling the task manager if we've reached the limit
// this is probably fine, but it might be worth revisiting later
}
for res in task_manager.poll() {
match res { match res {
ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => { ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => {
//TODO this can fuck shit up really badly if io op gets overwritten by worldgen chunk
//TODO only accept if loading stage, not loaded
//If unwanted chunk is already loaded //If unwanted chunk is already loaded
//It would be ~~...unethical~~ impossible to abort the operation at this point //It would be ~~...unethical~~ impossible to abort the operation at this point
//Instead, we'll just throw it away //Instead, we'll just throw it away

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use atomic::Atomic; use atomic::Atomic;
use flume::{Sender, Receiver}; use flume::{Receiver, Sender, TryIter};
use glam::IVec3; use glam::IVec3;
use kubi_shared::{queue::QueuedBlock, worldgen::AbortState}; use kubi_shared::{queue::QueuedBlock, worldgen::AbortState};
use shipyard::Unique; use shipyard::Unique;
@ -89,7 +89,12 @@ impl ChunkTaskManager {
}); });
} }
#[deprecated(note = "use poll instead")]
pub fn receive(&self) -> Option<ChunkTaskResponse> { pub fn receive(&self) -> Option<ChunkTaskResponse> {
self.channel.1.try_recv().ok() self.channel.1.try_recv().ok()
} }
pub fn poll(&self) -> TryIter<ChunkTaskResponse> {
self.channel.1.try_iter()
}
} }