From 043bb873c435f6462a0ef624e0110040db864355 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Thu, 25 Apr 2024 12:30:25 +0200 Subject: [PATCH] Sync client disconnects --- kubi-server/src/client.rs | 215 +++++++++++------- kubi-server/src/main.rs | 99 ++++---- kubi-server/src/world.rs | 417 +++++++++++++++++----------------- kubi/src/networking.rs | 4 +- kubi/src/networking/player.rs | 32 +++ 5 files changed, 429 insertions(+), 338 deletions(-) diff --git a/kubi-server/src/client.rs b/kubi-server/src/client.rs index 8adc975..4836c8c 100644 --- a/kubi-server/src/client.rs +++ b/kubi-server/src/client.rs @@ -1,81 +1,134 @@ -use glam::Mat4; -use shipyard::{Component, EntityId, Unique, AllStoragesView, UniqueView, NonSendSync, View, ViewMut, Get, IntoIter}; -use hashbrown::HashMap; -use uflow::SendMode; -use std::net::SocketAddr; -use kubi_shared::{ - networking::{ - client::{ClientIdMap, Client}, - messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType}, - channels::Channel - }, - transform::Transform -}; -use crate::{ - server::{ServerEvents, UdpServer}, - util::check_message_auth -}; - -#[derive(Component, Clone, Copy)] -pub struct ClientAddress(pub SocketAddr); - -#[derive(Unique, Default)] -pub struct ClientAddressMap(pub HashMap); -impl ClientAddressMap { - pub fn new() -> Self { Self::default() } -} - -pub fn init_client_maps( - storages: AllStoragesView -) { - storages.add_unique(ClientIdMap::new()); - storages.add_unique(ClientAddressMap::new()); -} - -pub fn sync_client_positions( - server: NonSendSync>, - events: UniqueView, - addr_map: UniqueView, - clients: View, - mut transforms: ViewMut, - addrs: View, -) { - for event in &events.0 { - let Some(message) = check_message_auth - ::<{ClientToServerMessageType::PositionChanged as u8}> - (&server, event, &clients, &addr_map) else { continue }; - - let ClientToServerMessage::PositionChanged { position, velocity: _, direction } = message.message else { - unreachable!() - }; - - //log movement (annoying duh) - log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); - - //Apply position to server-side client - let mut trans = (&mut transforms).get(message.entity_id).unwrap(); - trans.0 = Mat4::from_rotation_translation(direction, position); - - //Transmit the change to other players - for (other_client, other_client_address) in (&clients, &addrs).iter() { - if other_client.0 == message.client_id { - continue - } - let Some(client) = server.0.client(&other_client_address.0) else { - log::error!("Client with address not found"); - continue - }; - client.borrow_mut().send( - postcard::to_allocvec( - &ServerToClientMessage::PlayerPositionChanged { - client_id: message.client_id, - position, - direction - } - ).unwrap().into_boxed_slice(), - Channel::Move as usize, - SendMode::Reliable - ); - } - } -} +use glam::Mat4; +use shipyard::{AllStoragesView, AllStoragesViewMut, Component, EntitiesViewMut, EntityId, Get, IntoIter, NonSendSync, Remove, Unique, UniqueView, UniqueViewMut, View, ViewMut}; +use hashbrown::HashMap; +use uflow::{server::Event, SendMode}; +use std::net::SocketAddr; +use kubi_shared::{ + networking::{ + client::{ClientIdMap, Client}, + messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType}, + channels::Channel + }, + transform::Transform +}; +use crate::{ + server::{ServerEvents, UdpServer}, + util::check_message_auth, world::ChunkManager +}; + +#[derive(Component, Clone, Copy)] +pub struct ClientAddress(pub SocketAddr); + +#[derive(Unique, Default)] +pub struct ClientAddressMap(pub HashMap); +impl ClientAddressMap { + pub fn new() -> Self { Self::default() } +} + +pub fn init_client_maps( + storages: AllStoragesView +) { + storages.add_unique(ClientIdMap::new()); + storages.add_unique(ClientAddressMap::new()); +} + +pub fn sync_client_positions( + server: NonSendSync>, + events: UniqueView, + addr_map: UniqueView, + clients: View, + mut transforms: ViewMut, + addrs: View, +) { + for event in &events.0 { + let Some(message) = check_message_auth + ::<{ClientToServerMessageType::PositionChanged as u8}> + (&server, event, &clients, &addr_map) else { continue }; + + let ClientToServerMessage::PositionChanged { position, velocity: _, direction } = message.message else { + unreachable!() + }; + + //log movement (annoying duh) + log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); + + //Apply position to server-side client + let mut trans = (&mut transforms).get(message.entity_id).unwrap(); + trans.0 = Mat4::from_rotation_translation(direction, position); + + //Transmit the change to other players + for (other_client, other_client_address) in (&clients, &addrs).iter() { + if other_client.0 == message.client_id { + continue + } + let Some(client) = server.0.client(&other_client_address.0) else { + log::error!("Client with address not found"); + continue + }; + client.borrow_mut().send( + postcard::to_allocvec( + &ServerToClientMessage::PlayerPositionChanged { + client_id: message.client_id, + position, + direction + } + ).unwrap().into_boxed_slice(), + Channel::Move as usize, + SendMode::Reliable + ); + } + } +} + +pub fn on_client_disconnect( + mut all_storages: AllStoragesViewMut, +) { + let mut to_delete = Vec::new(); + { + let server = all_storages.borrow::>>().unwrap(); + let events = all_storages.borrow::>().unwrap(); + let mut addr_map = all_storages.borrow::>().unwrap(); + let mut id_map = all_storages.borrow::>().unwrap(); + let clients = all_storages.borrow::>().unwrap(); + let mut chunk_manager = all_storages.borrow::>().unwrap(); + let addrs = all_storages.borrow::>().unwrap(); + + for event in &events.0 { + if let Event::Disconnect(addr) = event { + let net_client = server.0.client(addr).unwrap(); + let Some(&entity_id) = addr_map.0.get(addr) else { + log::error!("Disconnected client not authenticated, moving on"); + continue; + }; + let client_id = clients.get(entity_id).unwrap().0; + log::info!("Client disconnected: ID {}", client_id); + + addr_map.0.remove(addr); + id_map.0.remove(&client_id); + to_delete.push(entity_id); + + //unsubscribe from chunks + chunk_manager.unsubscribe_all(client_id); + + //send disconnect message to other clients + for (_, other_client_address) in (&clients, &addrs).iter() { + let Some(client) = server.0.client(&other_client_address.0) else { + log::error!("Client with address not found"); + continue + }; + client.borrow_mut().send( + postcard::to_allocvec( + &ServerToClientMessage::PlayerDisconnected { id: client_id } + ).unwrap().into_boxed_slice(), + Channel::SysEvt as usize, + SendMode::Reliable + ); + } + } + } + + } + for entity_id in to_delete { + all_storages.delete_entity(entity_id); + } +} diff --git a/kubi-server/src/main.rs b/kubi-server/src/main.rs index 476b81d..8a216b1 100644 --- a/kubi-server/src/main.rs +++ b/kubi-server/src/main.rs @@ -1,49 +1,50 @@ -use shipyard::{World, Workload, IntoWorkload}; -use std::{thread, time::Duration}; - -mod util; -mod config; -mod server; -mod client; -mod world; -mod auth; - -use config::read_config; -use server::{bind_server, update_server, log_server_errors}; -use client::{init_client_maps, sync_client_positions}; -use auth::authenticate_players; -use world::{update_world, init_world}; - -fn initialize() -> Workload { - ( - read_config, - bind_server, - init_client_maps, - init_world, - ).into_workload() -} - -fn update() -> Workload { - ( - update_server, - ( - log_server_errors, - authenticate_players, - update_world, - sync_client_positions, - ).into_workload() - ).into_sequential_workload() -} - -fn main() { - kubi_logging::init(); - let world = World::new(); - world.add_workload(initialize); - world.add_workload(update); - world.run_workload(initialize).unwrap(); - log::info!("The server is now running"); - loop { - world.run_workload(update).unwrap(); - thread::sleep(Duration::from_millis(16)); - } -} +use shipyard::{World, Workload, IntoWorkload}; +use std::{thread, time::Duration}; + +mod util; +mod config; +mod server; +mod client; +mod world; +mod auth; + +use config::read_config; +use server::{bind_server, update_server, log_server_errors}; +use client::{init_client_maps, on_client_disconnect, sync_client_positions}; +use auth::authenticate_players; +use world::{update_world, init_world}; + +fn initialize() -> Workload { + ( + read_config, + bind_server, + init_client_maps, + init_world, + ).into_workload() +} + +fn update() -> Workload { + ( + update_server, + ( + log_server_errors, + authenticate_players, + update_world, + sync_client_positions, + on_client_disconnect, + ).into_workload() + ).into_sequential_workload() +} + +fn main() { + kubi_logging::init(); + let world = World::new(); + world.add_workload(initialize); + world.add_workload(update); + world.run_workload(initialize).unwrap(); + log::info!("The server is now running"); + loop { + world.run_workload(update).unwrap(); + thread::sleep(Duration::from_millis(16)); + } +} diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index 62c0667..5062e6b 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -1,207 +1,210 @@ -use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; -use glam::IVec3; -use hashbrown::HashMap; -use kubi_shared::networking::{ - messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType}, - channels::Channel, - client::Client, -}; -use uflow::{server::RemoteClient, SendMode}; -use lz4_flex::compress_prepend_size as lz4_compress; -use anyhow::Result; -use std::{rc::Rc, cell::RefCell}; -use kubi_shared::networking::client::ClientIdMap; -use crate::{ - server::{UdpServer, ServerEvents}, - config::ConfigTable, - client::{ClientAddress, ClientAddressMap}, - util::check_message_auth, -}; - -pub mod chunk; -pub mod tasks; - -use chunk::Chunk; - -use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState}; - -#[derive(Unique, Default)] -pub struct ChunkManager { - pub chunks: HashMap -} -impl ChunkManager { - pub fn new() -> Self { - Self::default() - } -} - -///Sends a compressed chunk packet -pub fn send_chunk_compressed( - client: &Rc>, - message: &ServerToClientMessage -) -> Result<()> { - let mut ser_message = postcard::to_allocvec(&message)?; - let mut compressed = lz4_compress(&ser_message[1..]); - ser_message.truncate(1); - ser_message.append(&mut compressed); - let ser_message = ser_message.into_boxed_slice(); - client.borrow_mut().send( - ser_message, - Channel::World as usize, - SendMode::Reliable - ); - Ok(()) -} - -fn process_chunk_requests( - server: NonSendSync>, - events: UniqueView, - mut chunk_manager: UniqueViewMut, - task_manager: UniqueView, - config: UniqueView, - addr_map: UniqueView, - clients: View -) { - for event in &events.0 { - let Some(message) = check_message_auth - ::<{ClientToServerMessageType::ChunkSubRequest as u8}> - (&server, event, &clients, &addr_map) else { continue }; - - let ClientToServerMessage::ChunkSubRequest { chunk: chunk_position } = message.message else { - unreachable!() - }; - - if let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) { - chunk.subscriptions.insert(message.client_id); - //TODO Start task here if status is "Nothing" - if let Some(blocks) = &chunk.blocks { - send_chunk_compressed( - message.client, - &ServerToClientMessage::ChunkResponse { - chunk: chunk_position, - data: blocks.clone(), - queued: Vec::with_capacity(0) - } - ).unwrap(); - } - } else { - let mut chunk = Chunk::new(chunk_position); - chunk.state = ChunkState::Loading; - chunk.subscriptions.insert(message.client_id); - chunk_manager.chunks.insert(chunk_position, chunk); - task_manager.spawn_task(ChunkTask::LoadChunk { - position: chunk_position, - seed: config.world.seed, - }); - } - } -} - -fn process_finished_tasks( - server: NonSendSync>, - task_manager: UniqueView, - mut chunk_manager: UniqueViewMut, - id_map: UniqueView, - client_addr: View, -) { - 'outer: while let Some(res) = task_manager.receive() { - let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; - let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { - log::warn!("Chunk discarded: Doesn't exist"); - continue - }; - if chunk.state != ChunkState::Loading { - log::warn!("Chunk discarded: Not Loading"); - continue - } - chunk.state = ChunkState::Loaded; - chunk.blocks = Some(blocks.clone()); - - log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); - - let chunk_packet = &ServerToClientMessage::ChunkResponse { - chunk: chunk_position, - data: blocks, - queued: queue - }; - - for &subscriber in &chunk.subscriptions { - let Some(&entity_id) = id_map.0.get(&subscriber) else { - log::error!("Invalid subscriber client id"); - continue 'outer; - }; - let Ok(&ClientAddress(client_addr)) = (&client_addr).get(entity_id) else { - log::error!("Invalid subscriber entity id"); - continue 'outer; - }; - let Some(client) = server.0.client(&client_addr) else { - log::error!("Client not connected"); - continue 'outer; - }; - send_chunk_compressed(client, chunk_packet).unwrap(); - // client.borrow_mut().send( - // chunk_packet.clone(), - // CHANNEL_WORLD, - // SendMode::Reliable, - // ); - } - } -} - -fn process_block_queue_messages( - server: NonSendSync>, - events: UniqueView, - addr_map: UniqueView, - clients: View, - addrs: View, -) { - for event in &events.0 { - let Some(message) = check_message_auth - ::<{ClientToServerMessageType::QueueBlock as u8}> - (&server, event, &clients, &addr_map) else { continue }; - - let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() }; - //TODO place in our own queue, for now just send to other clients - log::info!("Placed block {:?} at {}", item.block_type, item.position); - for (other_client, other_client_address) in (&clients, &addrs).iter() { - //No need to send the event back - if message.client_id == other_client.0 { - continue - } - //Get client - let Some(client) = server.0.client(&other_client_address.0) else { - log::error!("Client with address not found"); - continue - }; - //Send the message - client.borrow_mut().send( - postcard::to_allocvec( - &ServerToClientMessage::QueueBlock { item } - ).unwrap().into_boxed_slice(), - Channel::Block as usize, - SendMode::Reliable, - ); - } - } -} - -fn init_chunk_manager( - storages: AllStoragesView -) { - storages.add_unique(ChunkManager::new()); -} - -pub fn init_world() -> Workload { - ( - init_chunk_manager, - init_chunk_task_manager, - ).into_workload() -} - -pub fn update_world() -> Workload { - ( - process_chunk_requests, - process_finished_tasks, - process_block_queue_messages, - ).into_workload() -} +use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; +use glam::IVec3; +use hashbrown::HashMap; +use kubi_shared::networking::{ + channels::Channel, client::{Client, ClientId}, messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage} +}; +use uflow::{server::RemoteClient, SendMode}; +use lz4_flex::compress_prepend_size as lz4_compress; +use anyhow::Result; +use std::{rc::Rc, cell::RefCell}; +use kubi_shared::networking::client::ClientIdMap; +use crate::{ + server::{UdpServer, ServerEvents}, + config::ConfigTable, + client::{ClientAddress, ClientAddressMap}, + util::check_message_auth, +}; + +pub mod chunk; +pub mod tasks; + +use chunk::Chunk; + +use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState}; + +#[derive(Unique, Default)] +pub struct ChunkManager { + pub chunks: HashMap +} +impl ChunkManager { + pub fn unsubscribe_all(&mut self, client_id: ClientId) { + for chunk in self.chunks.values_mut() { + chunk.subscriptions.remove(&client_id); + } + } + pub fn new() -> Self { + Self::default() + } +} + +///Sends a compressed chunk packet +pub fn send_chunk_compressed( + client: &Rc>, + message: &ServerToClientMessage +) -> Result<()> { + let mut ser_message = postcard::to_allocvec(&message)?; + let mut compressed = lz4_compress(&ser_message[1..]); + ser_message.truncate(1); + ser_message.append(&mut compressed); + let ser_message = ser_message.into_boxed_slice(); + client.borrow_mut().send( + ser_message, + Channel::World as usize, + SendMode::Reliable + ); + Ok(()) +} + +fn process_chunk_requests( + server: NonSendSync>, + events: UniqueView, + mut chunk_manager: UniqueViewMut, + task_manager: UniqueView, + config: UniqueView, + addr_map: UniqueView, + clients: View +) { + for event in &events.0 { + let Some(message) = check_message_auth + ::<{ClientToServerMessageType::ChunkSubRequest as u8}> + (&server, event, &clients, &addr_map) else { continue }; + + let ClientToServerMessage::ChunkSubRequest { chunk: chunk_position } = message.message else { + unreachable!() + }; + + if let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) { + chunk.subscriptions.insert(message.client_id); + //TODO Start task here if status is "Nothing" + if let Some(blocks) = &chunk.blocks { + send_chunk_compressed( + message.client, + &ServerToClientMessage::ChunkResponse { + chunk: chunk_position, + data: blocks.clone(), + queued: Vec::with_capacity(0) + } + ).unwrap(); + } + } else { + let mut chunk = Chunk::new(chunk_position); + chunk.state = ChunkState::Loading; + chunk.subscriptions.insert(message.client_id); + chunk_manager.chunks.insert(chunk_position, chunk); + task_manager.spawn_task(ChunkTask::LoadChunk { + position: chunk_position, + seed: config.world.seed, + }); + } + } +} + +fn process_finished_tasks( + server: NonSendSync>, + task_manager: UniqueView, + mut chunk_manager: UniqueViewMut, + id_map: UniqueView, + client_addr: View, +) { + 'outer: while let Some(res) = task_manager.receive() { + let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; + let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { + log::warn!("Chunk discarded: Doesn't exist"); + continue + }; + if chunk.state != ChunkState::Loading { + log::warn!("Chunk discarded: Not Loading"); + continue + } + chunk.state = ChunkState::Loaded; + chunk.blocks = Some(blocks.clone()); + + log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); + + let chunk_packet = &ServerToClientMessage::ChunkResponse { + chunk: chunk_position, + data: blocks, + queued: queue + }; + + for &subscriber in &chunk.subscriptions { + let Some(&entity_id) = id_map.0.get(&subscriber) else { + log::error!("Invalid subscriber client id"); + continue 'outer; + }; + let Ok(&ClientAddress(client_addr)) = (&client_addr).get(entity_id) else { + log::error!("Invalid subscriber entity id"); + continue 'outer; + }; + let Some(client) = server.0.client(&client_addr) else { + log::error!("Client not connected"); + continue 'outer; + }; + send_chunk_compressed(client, chunk_packet).unwrap(); + // client.borrow_mut().send( + // chunk_packet.clone(), + // CHANNEL_WORLD, + // SendMode::Reliable, + // ); + } + } +} + +fn process_block_queue_messages( + server: NonSendSync>, + events: UniqueView, + addr_map: UniqueView, + clients: View, + addrs: View, +) { + for event in &events.0 { + let Some(message) = check_message_auth + ::<{ClientToServerMessageType::QueueBlock as u8}> + (&server, event, &clients, &addr_map) else { continue }; + + let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() }; + //TODO place in our own queue, for now just send to other clients + log::info!("Placed block {:?} at {}", item.block_type, item.position); + for (other_client, other_client_address) in (&clients, &addrs).iter() { + //No need to send the event back + if message.client_id == other_client.0 { + continue + } + //Get client + let Some(client) = server.0.client(&other_client_address.0) else { + log::error!("Client with address not found"); + continue + }; + //Send the message + client.borrow_mut().send( + postcard::to_allocvec( + &ServerToClientMessage::QueueBlock { item } + ).unwrap().into_boxed_slice(), + Channel::Block as usize, + SendMode::Reliable, + ); + } + } +} + +fn init_chunk_manager( + storages: AllStoragesView +) { + storages.add_unique(ChunkManager::new()); +} + +pub fn init_world() -> Workload { + ( + init_chunk_manager, + init_chunk_task_manager, + ).into_workload() +} + +pub fn update_world() -> Workload { + ( + process_chunk_requests, + process_finished_tasks, + process_block_queue_messages, + ).into_workload() +} diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 9dcdd88..15a44e3 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -38,7 +38,8 @@ use player::{ init_client_map, send_player_movement_events, receive_player_movement_events, - receive_player_connect_events + receive_player_connect_events, + receive_player_disconnect_events, }; const NET_TICKRATE: u16 = 33; @@ -137,6 +138,7 @@ pub fn update_networking() -> Workload { ( ( receive_player_connect_events, + receive_player_disconnect_events, ).into_workload(), ( recv_block_place_events, diff --git a/kubi/src/networking/player.rs b/kubi/src/networking/player.rs index a70f254..78ad66f 100644 --- a/kubi/src/networking/player.rs +++ b/kubi/src/networking/player.rs @@ -99,3 +99,35 @@ pub fn receive_player_connect_events( spawn_remote_player_multiplayer(&mut storages, init); } } + +pub fn receive_player_disconnect_events( + mut storages: AllStoragesViewMut, +) { + let messages: Vec = storages.borrow::>().unwrap().iter().filter_map(|event| { + let ClientEvent::Receive(data) = &event.0 else { + return None + }; + if !event.is_message_of_type::<{ServerToClientMessageType::PlayerDisconnected as u8}>() { + return None + }; + let Ok(parsed_message) = postcard::from_bytes(data) else { + log::error!("Malformed message"); + return None + }; + Some(parsed_message) + }).collect(); + + for message in messages { + let ServerToClientMessage::PlayerDisconnected { id } = message else { unreachable!() }; + log::info!("player disconnected: {}", id); + let mut id_map = storages.borrow::>().unwrap(); + let Some(ent_id) = id_map.0.remove(&id) else { + log::warn!("Disconnected player entity not found in client-id map"); + continue + }; + drop(id_map); + if !storages.delete_entity(ent_id) { + log::warn!("Disconnected player entity not found in storage"); + } + } +}