implement local server queue

This commit is contained in:
griffi-gh 2024-04-25 19:13:05 +02:00
parent dc1d8db27d
commit af5ed58442

View file

@ -1,8 +1,14 @@
use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter};
use glam::IVec3; use glam::IVec3;
use hashbrown::HashMap; use hashbrown::HashMap;
use kubi_shared::networking::{ use kubi_shared::{
channels::Channel, client::{Client, ClientId}, messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage} chunk::CHUNK_SIZE,
queue::QueuedBlock,
networking::{
channels::Channel,
client::{Client, ClientId},
messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage}
},
}; };
use uflow::{server::RemoteClient, SendMode}; use uflow::{server::RemoteClient, SendMode};
use lz4_flex::compress_prepend_size as lz4_compress; use lz4_flex::compress_prepend_size as lz4_compress;
@ -21,7 +27,15 @@ pub mod tasks;
use chunk::Chunk; use chunk::Chunk;
use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState}; use self::{
tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager},
chunk::ChunkState
};
#[derive(Unique, Default)]
pub struct LocalBlockQueue {
pub queue: Vec<QueuedBlock>,
}
#[derive(Unique, Default)] #[derive(Unique, Default)]
pub struct ChunkManager { pub struct ChunkManager {
@ -106,6 +120,7 @@ fn process_finished_tasks(
mut chunk_manager: UniqueViewMut<ChunkManager>, mut chunk_manager: UniqueViewMut<ChunkManager>,
id_map: UniqueView<ClientIdMap>, id_map: UniqueView<ClientIdMap>,
client_addr: View<ClientAddress>, client_addr: View<ClientAddress>,
mut local_queue: UniqueViewMut<LocalBlockQueue>,
) { ) {
'outer: while let Some(res) = task_manager.receive() { 'outer: while let Some(res) = task_manager.receive() {
let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res;
@ -120,12 +135,14 @@ fn process_finished_tasks(
chunk.state = ChunkState::Loaded; chunk.state = ChunkState::Loaded;
chunk.blocks = Some(blocks.clone()); chunk.blocks = Some(blocks.clone());
local_queue.queue.extend_from_slice(&queue);
log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len());
let chunk_packet = &ServerToClientMessage::ChunkResponse { let chunk_packet = &ServerToClientMessage::ChunkResponse {
chunk: chunk_position, chunk: chunk_position,
data: blocks, data: blocks,
queued: queue queued: queue //should this be here?
}; };
for &subscriber in &chunk.subscriptions { for &subscriber in &chunk.subscriptions {
@ -157,6 +174,7 @@ fn process_block_queue_messages(
addr_map: UniqueView<ClientAddressMap>, addr_map: UniqueView<ClientAddressMap>,
clients: View<Client>, clients: View<Client>,
addrs: View<ClientAddress>, addrs: View<ClientAddress>,
mut queue: UniqueViewMut<LocalBlockQueue>,
) { ) {
for event in &events.0 { for event in &events.0 {
let Some(message) = check_message_auth let Some(message) = check_message_auth
@ -164,7 +182,10 @@ fn process_block_queue_messages(
(&server, event, &clients, &addr_map) else { continue }; (&server, event, &clients, &addr_map) else { continue };
let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() }; let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() };
//TODO place in our own queue, for now just send to other clients
//place in our local world
queue.queue.push(item);
log::info!("Placed block {:?} at {}", item.block_type, item.position); log::info!("Placed block {:?} at {}", item.block_type, item.position);
for (other_client, other_client_address) in (&clients, &addrs).iter() { for (other_client, other_client_address) in (&clients, &addrs).iter() {
//No need to send the event back //No need to send the event back
@ -188,23 +209,44 @@ fn process_block_queue_messages(
} }
} }
fn init_chunk_manager( fn process_block_queue(
mut chunk_manager: UniqueViewMut<ChunkManager>,
mut queue: UniqueViewMut<LocalBlockQueue>,
) {
queue.queue.retain(|item| {
let chunk_position = item.position.div_euclid(IVec3::splat(CHUNK_SIZE as i32));
let block_position = item.position.rem_euclid(IVec3::splat(CHUNK_SIZE as i32));
let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else {
return true
};
let Some(blocks) = &mut chunk.blocks else {
return true
};
blocks[block_position.x as usize][block_position.y as usize][block_position.z as usize] = item.block_type;
false
});
}
/// init local block queue and chunk manager
fn init_chunk_manager_and_block_queue(
storages: AllStoragesView storages: AllStoragesView
) { ) {
storages.add_unique(ChunkManager::new()); storages.add_unique(ChunkManager::new());
storages.add_unique(LocalBlockQueue::default());
} }
pub fn init_world() -> Workload { pub fn init_world() -> Workload {
( (
init_chunk_manager, init_chunk_manager_and_block_queue,
init_chunk_task_manager, init_chunk_task_manager,
).into_workload() ).into_workload()
} }
pub fn update_world() -> Workload { pub fn update_world() -> Workload {
( (
process_chunk_requests,
process_finished_tasks, process_finished_tasks,
process_block_queue_messages, process_block_queue_messages,
).into_workload() process_block_queue,
process_chunk_requests,
).into_sequential_workload()
} }