diff --git a/kubi-server/src/world/tasks.rs b/kubi-server/src/world/tasks.rs index cae2d56..2ed1a09 100644 --- a/kubi-server/src/world/tasks.rs +++ b/kubi-server/src/world/tasks.rs @@ -78,7 +78,7 @@ impl ChunkTaskManager { pub fn receive(&self) -> Option { // Try to receive IO results first // If there are none, try to receive worldgen results - self.iota.as_ref().map(|iota| { + self.iota.as_ref().and_then(|iota| { iota.poll_single().map(|response| match response { IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded { chunk_position: position, @@ -87,10 +87,14 @@ impl ChunkTaskManager { }, _ => panic!("Unexpected response from IO thread"), }) - }).flatten().or_else(|| { + }).or_else(|| { self.channel.1.try_recv().ok() }) } + + pub fn iota(self) -> Option { + self.iota + } } pub fn init_chunk_task_manager( diff --git a/kubi-shared/src/data/io_thread.rs b/kubi-shared/src/data/io_thread.rs index eac20fd..d22e63d 100644 --- a/kubi-shared/src/data/io_thread.rs +++ b/kubi-shared/src/data/io_thread.rs @@ -8,6 +8,17 @@ use super::{SharedHeader, WorldSaveFile}; // may be broken, so currently disabled const MAX_SAVE_BATCH_SIZE: usize = usize::MAX; +#[derive(Clone, Copy, PartialEq, Eq, Debug, PartialOrd, Ord)] +pub enum TerminationStage { + Starting, + SaveQueue { + progress: usize, + total: usize, + }, + ProcessRx, + Terminated, +} + pub enum IOCommand { SaveChunk { position: IVec3, @@ -34,6 +45,9 @@ pub enum IOResponse { data: Option, }, + /// In-progress shutdown info + KysProgressInformational(TerminationStage), + /// The IO thread has been terminated Terminated, } @@ -68,7 +82,7 @@ impl IOThreadContext { // which breaks batching, so we need to check if there are any pending save requests // and if there are, use non-blocking recv to give them a chance to be processed 'rx: while let Some(command) = { - if self.save_queue.len() > 0 { + if !self.save_queue.is_empty() { self.rx.try_recv().ok() } else { self.rx.recv().ok() @@ -102,14 +116,34 @@ impl IOThreadContext { self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); } IOCommand::Kys => { + self.tx.send(IOResponse::KysProgressInformational( + TerminationStage::Starting, + )).unwrap(); + // Process all pending write commands - log::info!("info: queue has {} chunks", self.save_queue.len()); + let save_queue_len = self.save_queue.len(); + + log::info!("info: queue has {} chunks", save_queue_len); let mut saved_amount = 0; for (pos, data) in self.save_queue.drain(..) { self.save.save_chunk(pos, &data).unwrap(); saved_amount += 1; + + // Send kys preflight info + self.tx.send(IOResponse::KysProgressInformational( + TerminationStage::SaveQueue { + progress: saved_amount, + total: save_queue_len, + } + )).unwrap(); } - log::debug!("now, moving on to the rx queue..."); + + log::debug!("now, moving on to the rx queue, ..."); + + self.tx.send(IOResponse::KysProgressInformational( + TerminationStage::ProcessRx + )).unwrap(); + for cmd in self.rx.try_iter() { let IOCommand::SaveChunk { position, data } = cmd else { continue; @@ -118,13 +152,18 @@ impl IOThreadContext { saved_amount += 1; } log::info!("saved {} chunks on exit", saved_amount); + + self.tx.send(IOResponse::KysProgressInformational( + TerminationStage::Terminated + )).unwrap(); self.tx.send(IOResponse::Terminated).unwrap(); + return; } } } // between every betch of requests, check if there are any pending save requests - if self.save_queue.len() > 0 { + if !self.save_queue.is_empty() { let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len()); log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE); for (pos, data) in self.save_queue.drain(..will_drain) { @@ -138,8 +177,9 @@ impl IOThreadContext { pub struct IOSingleThread { tx: Sender, rx: Receiver, - handle: std::thread::JoinHandle<()>, + handle: Option>, header: SharedHeader, + exit_requested: bool, } impl IOSingleThread { @@ -162,8 +202,9 @@ impl IOSingleThread { IOSingleThread { tx: command_tx, rx: response_rx, - handle, + handle: Some(handle), header, + exit_requested: false, } } @@ -183,25 +224,35 @@ impl IOSingleThread { } /// Signal the IO thread to process the remaining requests and wait for it to terminate - pub fn stop_sync(&self) { + #[deprecated(note = "Use stop_sync instead")] + pub fn deprecated_stop_sync(&mut self) { log::debug!("Stopping IO thread (sync)"); // Tell the thread to terminate and wait for it to finish - self.tx.send(IOCommand::Kys).unwrap(); - while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} + if !self.exit_requested { + self.exit_requested = true; + self.tx.send(IOCommand::Kys).unwrap(); + } + // while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} - // HACK "we have .join at home" - while !self.handle.is_finished() {} + // // HACK "we have .join at home" + // while !self.handle.is_finished() {} + self.stop_async_block_on(); log::debug!("IO thread stopped"); //almost lol } /// Same as stop_sync but doesn't wait for the IO thread to terminate - pub fn stop_async(&self) { + pub fn stop_async(&mut self) { log::debug!("Stopping IO thread (async)"); + self.exit_requested = true; self.tx.send(IOCommand::Kys).unwrap(); } + pub fn stop_async_block_on(&mut self) { + self.handle.take().unwrap().join().unwrap(); + } + pub fn chunk_exists(&self, position: IVec3) -> bool { self.header.read().unwrap().chunk_map.contains_key(&position) } @@ -209,12 +260,15 @@ impl IOSingleThread { impl Drop for IOSingleThread { fn drop(&mut self) { - log::trace!("IOSingleThread dropped, about to sync unsaved data..."); - self.stop_sync(); + if self.handle.is_some() { + log::warn!("IOSingleThread dropped without being stopped first. (about to sync unsaved data...)"); + self.deprecated_stop_sync(); + } else { + log::trace!("IOSingleThread dropped, already stopped"); + } } } - /// This is a stub for future implemntation that may use multiple IO threads #[derive(Unique)] pub struct IOThreadManager { @@ -243,6 +297,20 @@ impl IOThreadManager { pub fn chunk_exists(&self, position: IVec3) -> bool { self.thread.chunk_exists(position) } + + #[allow(deprecated)] + #[deprecated(note = "Use stop_async and block_on_termination instead")] + pub fn deprecated_stop_sync(&mut self) { + self.thread.deprecated_stop_sync(); + } + + pub fn stop_async_block_on(&mut self) { + self.thread.stop_async_block_on(); + } + + pub fn stop_async(&mut self) { + self.thread.stop_async(); + } } // i think im a girl :3 (noone will ever read this right? :p) diff --git a/kubi/src/lib.rs b/kubi/src/lib.rs index 27c7074..bc45e2f 100644 --- a/kubi/src/lib.rs +++ b/kubi/src/lib.rs @@ -34,6 +34,7 @@ pub(crate) use ui::{ chat_ui, crosshair_ui, settings_ui, + shutdown_screen, }; pub(crate) mod rendering; pub(crate) mod world; @@ -58,7 +59,11 @@ pub(crate) mod client_physics; pub(crate) mod chat; use world::{ - init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager + init_game_world, + loading::{save_on_exit, update_loaded_world_around_player}, + queue::apply_queued_blocks, + raycast::update_raycasts, + tasks::ChunkTaskManager, }; use player::{spawn_player, MainPlayer}; use prefabs::load_prefabs; @@ -72,11 +77,12 @@ use block_placement::update_block_placement; use delta_time::{DeltaTime, init_delta_time}; use cursor_lock::{debug_toggle_lock, insert_lock_state, lock_cursor_now, update_cursor_lock_state}; use control_flow::{exit_on_esc, insert_control_flow_unique, RequestExit}; -use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state, is_connecting}; +use state::{init_state, is_connecting, is_ingame, is_ingame_or_loading, is_loading, is_shutting_down, update_state}; use networking::{update_networking, update_networking_late, is_multiplayer, disconnect_on_exit, is_singleplayer}; use init::initialize_from_args; use hui_integration::{kubi_ui_begin, /*kubi_ui_draw,*/ kubi_ui_end, kubi_ui_init}; use loading_screen::update_loading_screen; +use shutdown_screen::update_shutdown_screen; use connecting_screen::update_connecting_screen; use fixed_timestamp::init_fixed_timestamp_storage; use filesystem::AssetManager; @@ -149,11 +155,15 @@ fn update() -> Workload { draw_crosshair, render_settings_ui, ).into_sequential_workload().run_if(is_ingame), + ( + update_shutdown_screen, + ).into_sequential_workload().run_if(is_shutting_down), update_networking_late.run_if(is_multiplayer), compute_cameras, kubi_ui_end, - update_state, exit_on_esc, + shutdown_screen::late_intercept, + update_state, update_rendering_late, ).into_sequential_workload() } diff --git a/kubi/src/state.rs b/kubi/src/state.rs index fbe8d50..0c2a294 100644 --- a/kubi/src/state.rs +++ b/kubi/src/state.rs @@ -7,7 +7,8 @@ pub enum GameState { Initial, Connecting, LoadingWorld, - InGame + InGame, + ShuttingDown, } #[derive(Unique, PartialEq, Eq, Default, Clone, Copy)] @@ -51,8 +52,20 @@ pub fn is_loading( matches!(*state, GameState::LoadingWorld) } +pub fn is_shutting_down( + state: UniqueView +) -> bool { + *state == GameState::ShuttingDown +} + pub fn is_ingame_or_loading( state: UniqueView ) -> bool { matches!(*state, GameState::InGame | GameState::LoadingWorld) } + +pub fn is_ingame_or_shutting_down( + state: UniqueView +) -> bool { + matches!(*state, GameState::InGame | GameState::ShuttingDown) +} diff --git a/kubi/src/ui.rs b/kubi/src/ui.rs index bdf5613..7a2c58b 100644 --- a/kubi/src/ui.rs +++ b/kubi/src/ui.rs @@ -1,5 +1,6 @@ pub(crate) mod loading_screen; pub(crate) mod connecting_screen; +pub(crate) mod shutdown_screen; pub(crate) mod chat_ui; pub(crate) mod crosshair_ui; -pub(crate) mod settings_ui; +pub(crate) mod settings_ui; \ No newline at end of file diff --git a/kubi/src/ui/shutdown_screen.rs b/kubi/src/ui/shutdown_screen.rs new file mode 100644 index 0000000..5fb6f0c --- /dev/null +++ b/kubi/src/ui/shutdown_screen.rs @@ -0,0 +1,150 @@ +use hui::element::{progress_bar::ProgressBar, text::Text, UiElementExt}; +use shipyard::{AllStoragesView, AllStoragesViewMut, IntoWorkload, NonSendSync, SystemModificator, Unique, UniqueView, UniqueViewMut, Workload, WorkloadModificator}; +use kubi_shared::data::io_thread::{IOResponse, IOThreadManager, TerminationStage}; +use crate::{ + control_flow::RequestExit, cursor_lock::CursorLock, hui_integration::UiState, loading_screen::loading_screen_base, networking::is_singleplayer, rendering::Renderer, state::{is_ingame, is_ingame_or_shutting_down, is_shutting_down, GameState, NextState} +}; + +// TODO move shutdown non-UI logic to a separate file + +#[derive(Unique)] +struct ShutdownState { + // iota: IOThreadManager, + termination: TerminationStage, +} + +fn intercept_exit( + mut exit: UniqueViewMut, + mut state: UniqueViewMut, + cur_state: UniqueView, + termination_state: Option>, +) { + if exit.0 { + if *cur_state == GameState::ShuttingDown { + // If we're already shutting down, check if we're done + // If not, ignore the exit request + if let Some(termination_state) = termination_state { + if termination_state.termination != TerminationStage::Terminated { + log::warn!("Exit request intercepted, ignoring as we're still shutting down"); + exit.0 = false; + } + } + } else if state.0 != Some(GameState::ShuttingDown) { + log::info!("Exit request intercepted, transitioning to shutdown state"); + exit.0 = false; + state.0 = Some(GameState::ShuttingDown); + } + } +} + +pub fn init_shutdown_state( + storages: AllStoragesView, +) { + storages.add_unique(ShutdownState { + termination: TerminationStage::Starting, + }); + + // HACK: Tell iota to kys (todo do on state transition instead) + log::info!("IO Thread stopping gracefully... (stop_async)"); + let mut iota = storages.borrow::>().unwrap(); + iota.stop_async(); + + // ..and make sure to disable cursor lock + let mut lock = storages.borrow::>().unwrap(); + lock.0 = false; +} + +pub fn update_shutdown_state( + storages: AllStoragesViewMut, +) { + let Ok(iota) = storages.borrow::>() else { + log::warn!("IO Thread not found, skipping shutdown state update"); + return; + }; + let mut state = storages.borrow::>().unwrap(); + + let mut do_drop_iota = false; + + //poll the IO thread for progress + for response in iota.poll() { + match response { + IOResponse::KysProgressInformational(stage) => { + state.termination = stage; + }, + IOResponse::Terminated => { + state.termination = TerminationStage::Terminated; + + do_drop_iota = true; + + // Request exit + let mut exit = storages.borrow::>().unwrap(); + exit.0 = true; + }, + _ => {} + } + } + + drop(iota); + + // Hard-stop and drop the iota + if do_drop_iota { + let mut iota = storages.remove_unique::().unwrap(); + iota.stop_async_block_on(); + log::info!("IO Thread terminated on stop_async_block_on"); + drop(iota); + } +} + +fn render_shutdown_ui( + mut ui: NonSendSync>, + ren: UniqueView, + state: UniqueView, +) { + loading_screen_base(1., |ui| { + Text::new("Shutting down...") + .with_text_size(16) + .add_child(ui); + match state.termination { + TerminationStage::Starting => { + Text::new("Please wait...") + .with_text_size(16) + .add_child(ui); + }, + TerminationStage::SaveQueue { progress, total } => { + Text::new(format!("Saving chunks: {}/{}", progress, total)) + .with_text_size(16) + .add_child(ui); + ProgressBar::default() + .with_value(progress as f32 / total as f32) + .add_child(ui); + }, + TerminationStage::ProcessRx => { + Text::new("Processing remaining save requests...") + .with_text_size(16) + .add_child(ui); + }, + TerminationStage::Terminated => { + Text::new("Terminated.") + .with_text_size(16) + .add_child(ui); + } + } + }).add_root(&mut ui.hui, ren.size_vec2()) +} + +pub fn update_shutdown_screen() -> Workload { + ( + init_shutdown_state + .run_if_missing_unique::(), + update_shutdown_state, + render_shutdown_ui, + ).into_sequential_workload() +} + +pub fn late_intercept() -> Workload { + ( + intercept_exit, + ).into_workload() + .run_if(is_singleplayer) + .run_if(is_ingame_or_shutting_down) +}