From 6f84d9014a7e6533f1a3a7b337d1ceb454094cdc Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Thu, 25 Apr 2024 19:13:05 +0200 Subject: [PATCH] implement local server queue --- kubi-server/src/world.rs | 60 ++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index 5062e6b..7401238 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -1,8 +1,14 @@ use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; use glam::IVec3; use hashbrown::HashMap; -use kubi_shared::networking::{ - channels::Channel, client::{Client, ClientId}, messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage} +use kubi_shared::{ + chunk::CHUNK_SIZE, + queue::QueuedBlock, + networking::{ + channels::Channel, + client::{Client, ClientId}, + messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage} + }, }; use uflow::{server::RemoteClient, SendMode}; use lz4_flex::compress_prepend_size as lz4_compress; @@ -21,7 +27,15 @@ pub mod tasks; use chunk::Chunk; -use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState}; +use self::{ + tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, + chunk::ChunkState +}; + +#[derive(Unique, Default)] +pub struct LocalBlockQueue { + pub queue: Vec, +} #[derive(Unique, Default)] pub struct ChunkManager { @@ -106,6 +120,7 @@ fn process_finished_tasks( mut chunk_manager: UniqueViewMut, id_map: UniqueView, client_addr: View, + mut local_queue: UniqueViewMut, ) { 'outer: while let Some(res) = task_manager.receive() { let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; @@ -120,12 +135,14 @@ fn process_finished_tasks( chunk.state = ChunkState::Loaded; chunk.blocks = Some(blocks.clone()); + local_queue.queue.extend_from_slice(&queue); + log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); let chunk_packet = &ServerToClientMessage::ChunkResponse { chunk: chunk_position, data: blocks, - queued: queue + queued: queue //should this be here? }; for &subscriber in &chunk.subscriptions { @@ -157,6 +174,7 @@ fn process_block_queue_messages( addr_map: UniqueView, clients: View, addrs: View, + mut queue: UniqueViewMut, ) { for event in &events.0 { let Some(message) = check_message_auth @@ -164,7 +182,10 @@ fn process_block_queue_messages( (&server, event, &clients, &addr_map) else { continue }; let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() }; - //TODO place in our own queue, for now just send to other clients + + //place in our local world + queue.queue.push(item); + log::info!("Placed block {:?} at {}", item.block_type, item.position); for (other_client, other_client_address) in (&clients, &addrs).iter() { //No need to send the event back @@ -188,23 +209,44 @@ fn process_block_queue_messages( } } -fn init_chunk_manager( +fn process_block_queue( + mut chunk_manager: UniqueViewMut, + mut queue: UniqueViewMut, +) { + queue.queue.retain(|item| { + let chunk_position = item.position.div_euclid(IVec3::splat(CHUNK_SIZE as i32)); + let block_position = item.position.rem_euclid(IVec3::splat(CHUNK_SIZE as i32)); + let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { + return true + }; + let Some(blocks) = &mut chunk.blocks else { + return true + }; + blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize] = item.block_type; + false + }); +} + +/// init local block queue and chunk manager +fn init_chunk_manager_and_block_queue( storages: AllStoragesView ) { storages.add_unique(ChunkManager::new()); + storages.add_unique(LocalBlockQueue::default()); } pub fn init_world() -> Workload { ( - init_chunk_manager, + init_chunk_manager_and_block_queue, init_chunk_task_manager, ).into_workload() } pub fn update_world() -> Workload { ( - process_chunk_requests, process_finished_tasks, process_block_queue_messages, - ).into_workload() + process_block_queue, + process_chunk_requests, + ).into_sequential_workload() }