add async shutdown screen

This commit is contained in:
griffi-gh 2024-12-08 20:48:04 +01:00
parent 7a375cf3c0
commit 0349cd53c1
6 changed files with 268 additions and 22 deletions

View file

@ -78,7 +78,7 @@ impl ChunkTaskManager {
pub fn receive(&self) -> Option<ChunkTaskResponse> { pub fn receive(&self) -> Option<ChunkTaskResponse> {
// Try to receive IO results first // Try to receive IO results first
// If there are none, try to receive worldgen results // If there are none, try to receive worldgen results
self.iota.as_ref().map(|iota| { self.iota.as_ref().and_then(|iota| {
iota.poll_single().map(|response| match response { iota.poll_single().map(|response| match response {
IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded { IOResponse::ChunkLoaded { position, data } => ChunkTaskResponse::ChunkLoaded {
chunk_position: position, chunk_position: position,
@ -87,10 +87,14 @@ impl ChunkTaskManager {
}, },
_ => panic!("Unexpected response from IO thread"), _ => panic!("Unexpected response from IO thread"),
}) })
}).flatten().or_else(|| { }).or_else(|| {
self.channel.1.try_recv().ok() self.channel.1.try_recv().ok()
}) })
} }
pub fn iota(self) -> Option<IOThreadManager> {
self.iota
}
} }
pub fn init_chunk_task_manager( pub fn init_chunk_task_manager(

View file

@ -8,6 +8,17 @@ use super::{SharedHeader, WorldSaveFile};
// may be broken, so currently disabled // may be broken, so currently disabled
const MAX_SAVE_BATCH_SIZE: usize = usize::MAX; const MAX_SAVE_BATCH_SIZE: usize = usize::MAX;
#[derive(Clone, Copy, PartialEq, Eq, Debug, PartialOrd, Ord)]
pub enum TerminationStage {
Starting,
SaveQueue {
progress: usize,
total: usize,
},
ProcessRx,
Terminated,
}
pub enum IOCommand { pub enum IOCommand {
SaveChunk { SaveChunk {
position: IVec3, position: IVec3,
@ -34,6 +45,9 @@ pub enum IOResponse {
data: Option<BlockData>, data: Option<BlockData>,
}, },
/// In-progress shutdown info
KysProgressInformational(TerminationStage),
/// The IO thread has been terminated /// The IO thread has been terminated
Terminated, Terminated,
} }
@ -68,7 +82,7 @@ impl IOThreadContext {
// which breaks batching, so we need to check if there are any pending save requests // which breaks batching, so we need to check if there are any pending save requests
// and if there are, use non-blocking recv to give them a chance to be processed // and if there are, use non-blocking recv to give them a chance to be processed
'rx: while let Some(command) = { 'rx: while let Some(command) = {
if self.save_queue.len() > 0 { if !self.save_queue.is_empty() {
self.rx.try_recv().ok() self.rx.try_recv().ok()
} else { } else {
self.rx.recv().ok() self.rx.recv().ok()
@ -102,14 +116,34 @@ impl IOThreadContext {
self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap(); self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap();
} }
IOCommand::Kys => { IOCommand::Kys => {
self.tx.send(IOResponse::KysProgressInformational(
TerminationStage::Starting,
)).unwrap();
// Process all pending write commands // Process all pending write commands
log::info!("info: queue has {} chunks", self.save_queue.len()); let save_queue_len = self.save_queue.len();
log::info!("info: queue has {} chunks", save_queue_len);
let mut saved_amount = 0; let mut saved_amount = 0;
for (pos, data) in self.save_queue.drain(..) { for (pos, data) in self.save_queue.drain(..) {
self.save.save_chunk(pos, &data).unwrap(); self.save.save_chunk(pos, &data).unwrap();
saved_amount += 1; saved_amount += 1;
// Send kys preflight info
self.tx.send(IOResponse::KysProgressInformational(
TerminationStage::SaveQueue {
progress: saved_amount,
total: save_queue_len,
} }
log::debug!("now, moving on to the rx queue..."); )).unwrap();
}
log::debug!("now, moving on to the rx queue, ...");
self.tx.send(IOResponse::KysProgressInformational(
TerminationStage::ProcessRx
)).unwrap();
for cmd in self.rx.try_iter() { for cmd in self.rx.try_iter() {
let IOCommand::SaveChunk { position, data } = cmd else { let IOCommand::SaveChunk { position, data } = cmd else {
continue; continue;
@ -118,13 +152,18 @@ impl IOThreadContext {
saved_amount += 1; saved_amount += 1;
} }
log::info!("saved {} chunks on exit", saved_amount); log::info!("saved {} chunks on exit", saved_amount);
self.tx.send(IOResponse::KysProgressInformational(
TerminationStage::Terminated
)).unwrap();
self.tx.send(IOResponse::Terminated).unwrap(); self.tx.send(IOResponse::Terminated).unwrap();
return; return;
} }
} }
} }
// between every betch of requests, check if there are any pending save requests // between every betch of requests, check if there are any pending save requests
if self.save_queue.len() > 0 { if !self.save_queue.is_empty() {
let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len()); let will_drain = MAX_SAVE_BATCH_SIZE.min(self.save_queue.len());
log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE); log::info!("saving {}/{} chunks with batch size {}...", will_drain, self.save_queue.len(), MAX_SAVE_BATCH_SIZE);
for (pos, data) in self.save_queue.drain(..will_drain) { for (pos, data) in self.save_queue.drain(..will_drain) {
@ -138,8 +177,9 @@ impl IOThreadContext {
pub struct IOSingleThread { pub struct IOSingleThread {
tx: Sender<IOCommand>, tx: Sender<IOCommand>,
rx: Receiver<IOResponse>, rx: Receiver<IOResponse>,
handle: std::thread::JoinHandle<()>, handle: Option<std::thread::JoinHandle<()>>,
header: SharedHeader, header: SharedHeader,
exit_requested: bool,
} }
impl IOSingleThread { impl IOSingleThread {
@ -162,8 +202,9 @@ impl IOSingleThread {
IOSingleThread { IOSingleThread {
tx: command_tx, tx: command_tx,
rx: response_rx, rx: response_rx,
handle, handle: Some(handle),
header, header,
exit_requested: false,
} }
} }
@ -183,25 +224,35 @@ impl IOSingleThread {
} }
/// Signal the IO thread to process the remaining requests and wait for it to terminate /// Signal the IO thread to process the remaining requests and wait for it to terminate
pub fn stop_sync(&self) { #[deprecated(note = "Use stop_sync instead")]
pub fn deprecated_stop_sync(&mut self) {
log::debug!("Stopping IO thread (sync)"); log::debug!("Stopping IO thread (sync)");
// Tell the thread to terminate and wait for it to finish // Tell the thread to terminate and wait for it to finish
if !self.exit_requested {
self.exit_requested = true;
self.tx.send(IOCommand::Kys).unwrap(); self.tx.send(IOCommand::Kys).unwrap();
while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {} }
// while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {}
// HACK "we have .join at home" // // HACK "we have .join at home"
while !self.handle.is_finished() {} // while !self.handle.is_finished() {}
self.stop_async_block_on();
log::debug!("IO thread stopped"); //almost lol log::debug!("IO thread stopped"); //almost lol
} }
/// Same as stop_sync but doesn't wait for the IO thread to terminate /// Same as stop_sync but doesn't wait for the IO thread to terminate
pub fn stop_async(&self) { pub fn stop_async(&mut self) {
log::debug!("Stopping IO thread (async)"); log::debug!("Stopping IO thread (async)");
self.exit_requested = true;
self.tx.send(IOCommand::Kys).unwrap(); self.tx.send(IOCommand::Kys).unwrap();
} }
pub fn stop_async_block_on(&mut self) {
self.handle.take().unwrap().join().unwrap();
}
pub fn chunk_exists(&self, position: IVec3) -> bool { pub fn chunk_exists(&self, position: IVec3) -> bool {
self.header.read().unwrap().chunk_map.contains_key(&position) self.header.read().unwrap().chunk_map.contains_key(&position)
} }
@ -209,12 +260,15 @@ impl IOSingleThread {
impl Drop for IOSingleThread { impl Drop for IOSingleThread {
fn drop(&mut self) { fn drop(&mut self) {
log::trace!("IOSingleThread dropped, about to sync unsaved data..."); if self.handle.is_some() {
self.stop_sync(); log::warn!("IOSingleThread dropped without being stopped first. (about to sync unsaved data...)");
self.deprecated_stop_sync();
} else {
log::trace!("IOSingleThread dropped, already stopped");
}
} }
} }
/// This is a stub for future implemntation that may use multiple IO threads /// This is a stub for future implemntation that may use multiple IO threads
#[derive(Unique)] #[derive(Unique)]
pub struct IOThreadManager { pub struct IOThreadManager {
@ -243,6 +297,20 @@ impl IOThreadManager {
pub fn chunk_exists(&self, position: IVec3) -> bool { pub fn chunk_exists(&self, position: IVec3) -> bool {
self.thread.chunk_exists(position) self.thread.chunk_exists(position)
} }
#[allow(deprecated)]
#[deprecated(note = "Use stop_async and block_on_termination instead")]
pub fn deprecated_stop_sync(&mut self) {
self.thread.deprecated_stop_sync();
}
pub fn stop_async_block_on(&mut self) {
self.thread.stop_async_block_on();
}
pub fn stop_async(&mut self) {
self.thread.stop_async();
}
} }
// i think im a girl :3 (noone will ever read this right? :p) // i think im a girl :3 (noone will ever read this right? :p)

View file

@ -34,6 +34,7 @@ pub(crate) use ui::{
chat_ui, chat_ui,
crosshair_ui, crosshair_ui,
settings_ui, settings_ui,
shutdown_screen,
}; };
pub(crate) mod rendering; pub(crate) mod rendering;
pub(crate) mod world; pub(crate) mod world;
@ -58,7 +59,11 @@ pub(crate) mod client_physics;
pub(crate) mod chat; pub(crate) mod chat;
use world::{ use world::{
init_game_world, loading::{save_on_exit, update_loaded_world_around_player}, queue::apply_queued_blocks, raycast::update_raycasts, tasks::ChunkTaskManager init_game_world,
loading::{save_on_exit, update_loaded_world_around_player},
queue::apply_queued_blocks,
raycast::update_raycasts,
tasks::ChunkTaskManager,
}; };
use player::{spawn_player, MainPlayer}; use player::{spawn_player, MainPlayer};
use prefabs::load_prefabs; use prefabs::load_prefabs;
@ -72,11 +77,12 @@ use block_placement::update_block_placement;
use delta_time::{DeltaTime, init_delta_time}; use delta_time::{DeltaTime, init_delta_time};
use cursor_lock::{debug_toggle_lock, insert_lock_state, lock_cursor_now, update_cursor_lock_state}; use cursor_lock::{debug_toggle_lock, insert_lock_state, lock_cursor_now, update_cursor_lock_state};
use control_flow::{exit_on_esc, insert_control_flow_unique, RequestExit}; use control_flow::{exit_on_esc, insert_control_flow_unique, RequestExit};
use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state, is_connecting}; use state::{init_state, is_connecting, is_ingame, is_ingame_or_loading, is_loading, is_shutting_down, update_state};
use networking::{update_networking, update_networking_late, is_multiplayer, disconnect_on_exit, is_singleplayer}; use networking::{update_networking, update_networking_late, is_multiplayer, disconnect_on_exit, is_singleplayer};
use init::initialize_from_args; use init::initialize_from_args;
use hui_integration::{kubi_ui_begin, /*kubi_ui_draw,*/ kubi_ui_end, kubi_ui_init}; use hui_integration::{kubi_ui_begin, /*kubi_ui_draw,*/ kubi_ui_end, kubi_ui_init};
use loading_screen::update_loading_screen; use loading_screen::update_loading_screen;
use shutdown_screen::update_shutdown_screen;
use connecting_screen::update_connecting_screen; use connecting_screen::update_connecting_screen;
use fixed_timestamp::init_fixed_timestamp_storage; use fixed_timestamp::init_fixed_timestamp_storage;
use filesystem::AssetManager; use filesystem::AssetManager;
@ -149,11 +155,15 @@ fn update() -> Workload {
draw_crosshair, draw_crosshair,
render_settings_ui, render_settings_ui,
).into_sequential_workload().run_if(is_ingame), ).into_sequential_workload().run_if(is_ingame),
(
update_shutdown_screen,
).into_sequential_workload().run_if(is_shutting_down),
update_networking_late.run_if(is_multiplayer), update_networking_late.run_if(is_multiplayer),
compute_cameras, compute_cameras,
kubi_ui_end, kubi_ui_end,
update_state,
exit_on_esc, exit_on_esc,
shutdown_screen::late_intercept,
update_state,
update_rendering_late, update_rendering_late,
).into_sequential_workload() ).into_sequential_workload()
} }

View file

@ -7,7 +7,8 @@ pub enum GameState {
Initial, Initial,
Connecting, Connecting,
LoadingWorld, LoadingWorld,
InGame InGame,
ShuttingDown,
} }
#[derive(Unique, PartialEq, Eq, Default, Clone, Copy)] #[derive(Unique, PartialEq, Eq, Default, Clone, Copy)]
@ -51,8 +52,20 @@ pub fn is_loading(
matches!(*state, GameState::LoadingWorld) matches!(*state, GameState::LoadingWorld)
} }
pub fn is_shutting_down(
state: UniqueView<GameState>
) -> bool {
*state == GameState::ShuttingDown
}
pub fn is_ingame_or_loading( pub fn is_ingame_or_loading(
state: UniqueView<GameState> state: UniqueView<GameState>
) -> bool { ) -> bool {
matches!(*state, GameState::InGame | GameState::LoadingWorld) matches!(*state, GameState::InGame | GameState::LoadingWorld)
} }
pub fn is_ingame_or_shutting_down(
state: UniqueView<GameState>
) -> bool {
matches!(*state, GameState::InGame | GameState::ShuttingDown)
}

View file

@ -1,5 +1,6 @@
pub(crate) mod loading_screen; pub(crate) mod loading_screen;
pub(crate) mod connecting_screen; pub(crate) mod connecting_screen;
pub(crate) mod shutdown_screen;
pub(crate) mod chat_ui; pub(crate) mod chat_ui;
pub(crate) mod crosshair_ui; pub(crate) mod crosshair_ui;
pub(crate) mod settings_ui; pub(crate) mod settings_ui;

View file

@ -0,0 +1,150 @@
use hui::element::{progress_bar::ProgressBar, text::Text, UiElementExt};
use shipyard::{AllStoragesView, AllStoragesViewMut, IntoWorkload, NonSendSync, SystemModificator, Unique, UniqueView, UniqueViewMut, Workload, WorkloadModificator};
use kubi_shared::data::io_thread::{IOResponse, IOThreadManager, TerminationStage};
use crate::{
control_flow::RequestExit, cursor_lock::CursorLock, hui_integration::UiState, loading_screen::loading_screen_base, networking::is_singleplayer, rendering::Renderer, state::{is_ingame, is_ingame_or_shutting_down, is_shutting_down, GameState, NextState}
};
// TODO move shutdown non-UI logic to a separate file
#[derive(Unique)]
struct ShutdownState {
// iota: IOThreadManager,
termination: TerminationStage,
}
fn intercept_exit(
mut exit: UniqueViewMut<RequestExit>,
mut state: UniqueViewMut<NextState>,
cur_state: UniqueView<GameState>,
termination_state: Option<UniqueView<ShutdownState>>,
) {
if exit.0 {
if *cur_state == GameState::ShuttingDown {
// If we're already shutting down, check if we're done
// If not, ignore the exit request
if let Some(termination_state) = termination_state {
if termination_state.termination != TerminationStage::Terminated {
log::warn!("Exit request intercepted, ignoring as we're still shutting down");
exit.0 = false;
}
}
} else if state.0 != Some(GameState::ShuttingDown) {
log::info!("Exit request intercepted, transitioning to shutdown state");
exit.0 = false;
state.0 = Some(GameState::ShuttingDown);
}
}
}
pub fn init_shutdown_state(
storages: AllStoragesView,
) {
storages.add_unique(ShutdownState {
termination: TerminationStage::Starting,
});
// HACK: Tell iota to kys (todo do on state transition instead)
log::info!("IO Thread stopping gracefully... (stop_async)");
let mut iota = storages.borrow::<UniqueViewMut<IOThreadManager>>().unwrap();
iota.stop_async();
// ..and make sure to disable cursor lock
let mut lock = storages.borrow::<UniqueViewMut<CursorLock>>().unwrap();
lock.0 = false;
}
pub fn update_shutdown_state(
storages: AllStoragesViewMut,
) {
let Ok(iota) = storages.borrow::<UniqueViewMut<IOThreadManager>>() else {
log::warn!("IO Thread not found, skipping shutdown state update");
return;
};
let mut state = storages.borrow::<UniqueViewMut<ShutdownState>>().unwrap();
let mut do_drop_iota = false;
//poll the IO thread for progress
for response in iota.poll() {
match response {
IOResponse::KysProgressInformational(stage) => {
state.termination = stage;
},
IOResponse::Terminated => {
state.termination = TerminationStage::Terminated;
do_drop_iota = true;
// Request exit
let mut exit = storages.borrow::<UniqueViewMut<RequestExit>>().unwrap();
exit.0 = true;
},
_ => {}
}
}
drop(iota);
// Hard-stop and drop the iota
if do_drop_iota {
let mut iota = storages.remove_unique::<IOThreadManager>().unwrap();
iota.stop_async_block_on();
log::info!("IO Thread terminated on stop_async_block_on");
drop(iota);
}
}
fn render_shutdown_ui(
mut ui: NonSendSync<UniqueViewMut<UiState>>,
ren: UniqueView<Renderer>,
state: UniqueView<ShutdownState>,
) {
loading_screen_base(1., |ui| {
Text::new("Shutting down...")
.with_text_size(16)
.add_child(ui);
match state.termination {
TerminationStage::Starting => {
Text::new("Please wait...")
.with_text_size(16)
.add_child(ui);
},
TerminationStage::SaveQueue { progress, total } => {
Text::new(format!("Saving chunks: {}/{}", progress, total))
.with_text_size(16)
.add_child(ui);
ProgressBar::default()
.with_value(progress as f32 / total as f32)
.add_child(ui);
},
TerminationStage::ProcessRx => {
Text::new("Processing remaining save requests...")
.with_text_size(16)
.add_child(ui);
},
TerminationStage::Terminated => {
Text::new("Terminated.")
.with_text_size(16)
.add_child(ui);
}
}
}).add_root(&mut ui.hui, ren.size_vec2())
}
pub fn update_shutdown_screen() -> Workload {
(
init_shutdown_state
.run_if_missing_unique::<ShutdownState>(),
update_shutdown_state,
render_shutdown_ui,
).into_sequential_workload()
}
pub fn late_intercept() -> Workload {
(
intercept_exit,
).into_workload()
.run_if(is_singleplayer)
.run_if(is_ingame_or_shutting_down)
}