Sync client disconnects

This commit is contained in:
griffi-gh 2024-04-25 12:30:25 +02:00
parent fd8ec3478a
commit ee6a5dd2f9
5 changed files with 429 additions and 338 deletions

View file

@ -1,81 +1,134 @@
use glam::Mat4; use glam::Mat4;
use shipyard::{Component, EntityId, Unique, AllStoragesView, UniqueView, NonSendSync, View, ViewMut, Get, IntoIter}; use shipyard::{AllStoragesView, AllStoragesViewMut, Component, EntitiesViewMut, EntityId, Get, IntoIter, NonSendSync, Remove, Unique, UniqueView, UniqueViewMut, View, ViewMut};
use hashbrown::HashMap; use hashbrown::HashMap;
use uflow::SendMode; use uflow::{server::Event, SendMode};
use std::net::SocketAddr; use std::net::SocketAddr;
use kubi_shared::{ use kubi_shared::{
networking::{ networking::{
client::{ClientIdMap, Client}, client::{ClientIdMap, Client},
messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType}, messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType},
channels::Channel channels::Channel
}, },
transform::Transform transform::Transform
}; };
use crate::{ use crate::{
server::{ServerEvents, UdpServer}, server::{ServerEvents, UdpServer},
util::check_message_auth util::check_message_auth, world::ChunkManager
}; };
#[derive(Component, Clone, Copy)] #[derive(Component, Clone, Copy)]
pub struct ClientAddress(pub SocketAddr); pub struct ClientAddress(pub SocketAddr);
#[derive(Unique, Default)] #[derive(Unique, Default)]
pub struct ClientAddressMap(pub HashMap<SocketAddr, EntityId>); pub struct ClientAddressMap(pub HashMap<SocketAddr, EntityId>);
impl ClientAddressMap { impl ClientAddressMap {
pub fn new() -> Self { Self::default() } pub fn new() -> Self { Self::default() }
} }
pub fn init_client_maps( pub fn init_client_maps(
storages: AllStoragesView storages: AllStoragesView
) { ) {
storages.add_unique(ClientIdMap::new()); storages.add_unique(ClientIdMap::new());
storages.add_unique(ClientAddressMap::new()); storages.add_unique(ClientAddressMap::new());
} }
pub fn sync_client_positions( pub fn sync_client_positions(
server: NonSendSync<UniqueView<UdpServer>>, server: NonSendSync<UniqueView<UdpServer>>,
events: UniqueView<ServerEvents>, events: UniqueView<ServerEvents>,
addr_map: UniqueView<ClientAddressMap>, addr_map: UniqueView<ClientAddressMap>,
clients: View<Client>, clients: View<Client>,
mut transforms: ViewMut<Transform>, mut transforms: ViewMut<Transform>,
addrs: View<ClientAddress>, addrs: View<ClientAddress>,
) { ) {
for event in &events.0 { for event in &events.0 {
let Some(message) = check_message_auth let Some(message) = check_message_auth
::<{ClientToServerMessageType::PositionChanged as u8}> ::<{ClientToServerMessageType::PositionChanged as u8}>
(&server, event, &clients, &addr_map) else { continue }; (&server, event, &clients, &addr_map) else { continue };
let ClientToServerMessage::PositionChanged { position, velocity: _, direction } = message.message else { let ClientToServerMessage::PositionChanged { position, velocity: _, direction } = message.message else {
unreachable!() unreachable!()
}; };
//log movement (annoying duh) //log movement (annoying duh)
log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction); log::debug!("dbg: player moved id: {} coords: {} quat: {}", message.client_id, position, direction);
//Apply position to server-side client //Apply position to server-side client
let mut trans = (&mut transforms).get(message.entity_id).unwrap(); let mut trans = (&mut transforms).get(message.entity_id).unwrap();
trans.0 = Mat4::from_rotation_translation(direction, position); trans.0 = Mat4::from_rotation_translation(direction, position);
//Transmit the change to other players //Transmit the change to other players
for (other_client, other_client_address) in (&clients, &addrs).iter() { for (other_client, other_client_address) in (&clients, &addrs).iter() {
if other_client.0 == message.client_id { if other_client.0 == message.client_id {
continue continue
} }
let Some(client) = server.0.client(&other_client_address.0) else { let Some(client) = server.0.client(&other_client_address.0) else {
log::error!("Client with address not found"); log::error!("Client with address not found");
continue continue
}; };
client.borrow_mut().send( client.borrow_mut().send(
postcard::to_allocvec( postcard::to_allocvec(
&ServerToClientMessage::PlayerPositionChanged { &ServerToClientMessage::PlayerPositionChanged {
client_id: message.client_id, client_id: message.client_id,
position, position,
direction direction
} }
).unwrap().into_boxed_slice(), ).unwrap().into_boxed_slice(),
Channel::Move as usize, Channel::Move as usize,
SendMode::Reliable SendMode::Reliable
); );
} }
} }
} }
pub fn on_client_disconnect(
mut all_storages: AllStoragesViewMut,
) {
let mut to_delete = Vec::new();
{
let server = all_storages.borrow::<NonSendSync<UniqueView<UdpServer>>>().unwrap();
let events = all_storages.borrow::<UniqueView<ServerEvents>>().unwrap();
let mut addr_map = all_storages.borrow::<UniqueViewMut<ClientAddressMap>>().unwrap();
let mut id_map = all_storages.borrow::<UniqueViewMut<ClientIdMap>>().unwrap();
let clients = all_storages.borrow::<View<Client>>().unwrap();
let mut chunk_manager = all_storages.borrow::<UniqueViewMut<ChunkManager>>().unwrap();
let addrs = all_storages.borrow::<View<ClientAddress>>().unwrap();
for event in &events.0 {
if let Event::Disconnect(addr) = event {
let net_client = server.0.client(addr).unwrap();
let Some(&entity_id) = addr_map.0.get(addr) else {
log::error!("Disconnected client not authenticated, moving on");
continue;
};
let client_id = clients.get(entity_id).unwrap().0;
log::info!("Client disconnected: ID {}", client_id);
addr_map.0.remove(addr);
id_map.0.remove(&client_id);
to_delete.push(entity_id);
//unsubscribe from chunks
chunk_manager.unsubscribe_all(client_id);
//send disconnect message to other clients
for (_, other_client_address) in (&clients, &addrs).iter() {
let Some(client) = server.0.client(&other_client_address.0) else {
log::error!("Client with address not found");
continue
};
client.borrow_mut().send(
postcard::to_allocvec(
&ServerToClientMessage::PlayerDisconnected { id: client_id }
).unwrap().into_boxed_slice(),
Channel::SysEvt as usize,
SendMode::Reliable
);
}
}
}
}
for entity_id in to_delete {
all_storages.delete_entity(entity_id);
}
}

View file

@ -1,49 +1,50 @@
use shipyard::{World, Workload, IntoWorkload}; use shipyard::{World, Workload, IntoWorkload};
use std::{thread, time::Duration}; use std::{thread, time::Duration};
mod util; mod util;
mod config; mod config;
mod server; mod server;
mod client; mod client;
mod world; mod world;
mod auth; mod auth;
use config::read_config; use config::read_config;
use server::{bind_server, update_server, log_server_errors}; use server::{bind_server, update_server, log_server_errors};
use client::{init_client_maps, sync_client_positions}; use client::{init_client_maps, on_client_disconnect, sync_client_positions};
use auth::authenticate_players; use auth::authenticate_players;
use world::{update_world, init_world}; use world::{update_world, init_world};
fn initialize() -> Workload { fn initialize() -> Workload {
( (
read_config, read_config,
bind_server, bind_server,
init_client_maps, init_client_maps,
init_world, init_world,
).into_workload() ).into_workload()
} }
fn update() -> Workload { fn update() -> Workload {
( (
update_server, update_server,
( (
log_server_errors, log_server_errors,
authenticate_players, authenticate_players,
update_world, update_world,
sync_client_positions, sync_client_positions,
).into_workload() on_client_disconnect,
).into_sequential_workload() ).into_workload()
} ).into_sequential_workload()
}
fn main() {
kubi_logging::init(); fn main() {
let world = World::new(); kubi_logging::init();
world.add_workload(initialize); let world = World::new();
world.add_workload(update); world.add_workload(initialize);
world.run_workload(initialize).unwrap(); world.add_workload(update);
log::info!("The server is now running"); world.run_workload(initialize).unwrap();
loop { log::info!("The server is now running");
world.run_workload(update).unwrap(); loop {
thread::sleep(Duration::from_millis(16)); world.run_workload(update).unwrap();
} thread::sleep(Duration::from_millis(16));
} }
}

View file

@ -1,207 +1,210 @@
use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter};
use glam::IVec3; use glam::IVec3;
use hashbrown::HashMap; use hashbrown::HashMap;
use kubi_shared::networking::{ use kubi_shared::networking::{
messages::{ClientToServerMessage, ServerToClientMessage, ClientToServerMessageType}, channels::Channel, client::{Client, ClientId}, messages::{ClientToServerMessage, ClientToServerMessageType, ServerToClientMessage}
channels::Channel, };
client::Client, use uflow::{server::RemoteClient, SendMode};
}; use lz4_flex::compress_prepend_size as lz4_compress;
use uflow::{server::RemoteClient, SendMode}; use anyhow::Result;
use lz4_flex::compress_prepend_size as lz4_compress; use std::{rc::Rc, cell::RefCell};
use anyhow::Result; use kubi_shared::networking::client::ClientIdMap;
use std::{rc::Rc, cell::RefCell}; use crate::{
use kubi_shared::networking::client::ClientIdMap; server::{UdpServer, ServerEvents},
use crate::{ config::ConfigTable,
server::{UdpServer, ServerEvents}, client::{ClientAddress, ClientAddressMap},
config::ConfigTable, util::check_message_auth,
client::{ClientAddress, ClientAddressMap}, };
util::check_message_auth,
}; pub mod chunk;
pub mod tasks;
pub mod chunk;
pub mod tasks; use chunk::Chunk;
use chunk::Chunk; use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState};
use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse, init_chunk_task_manager}, chunk::ChunkState}; #[derive(Unique, Default)]
pub struct ChunkManager {
#[derive(Unique, Default)] pub chunks: HashMap<IVec3, Chunk>
pub struct ChunkManager { }
pub chunks: HashMap<IVec3, Chunk> impl ChunkManager {
} pub fn unsubscribe_all(&mut self, client_id: ClientId) {
impl ChunkManager { for chunk in self.chunks.values_mut() {
pub fn new() -> Self { chunk.subscriptions.remove(&client_id);
Self::default() }
} }
} pub fn new() -> Self {
Self::default()
///Sends a compressed chunk packet }
pub fn send_chunk_compressed( }
client: &Rc<RefCell<RemoteClient>>,
message: &ServerToClientMessage ///Sends a compressed chunk packet
) -> Result<()> { pub fn send_chunk_compressed(
let mut ser_message = postcard::to_allocvec(&message)?; client: &Rc<RefCell<RemoteClient>>,
let mut compressed = lz4_compress(&ser_message[1..]); message: &ServerToClientMessage
ser_message.truncate(1); ) -> Result<()> {
ser_message.append(&mut compressed); let mut ser_message = postcard::to_allocvec(&message)?;
let ser_message = ser_message.into_boxed_slice(); let mut compressed = lz4_compress(&ser_message[1..]);
client.borrow_mut().send( ser_message.truncate(1);
ser_message, ser_message.append(&mut compressed);
Channel::World as usize, let ser_message = ser_message.into_boxed_slice();
SendMode::Reliable client.borrow_mut().send(
); ser_message,
Ok(()) Channel::World as usize,
} SendMode::Reliable
);
fn process_chunk_requests( Ok(())
server: NonSendSync<UniqueView<UdpServer>>, }
events: UniqueView<ServerEvents>,
mut chunk_manager: UniqueViewMut<ChunkManager>, fn process_chunk_requests(
task_manager: UniqueView<ChunkTaskManager>, server: NonSendSync<UniqueView<UdpServer>>,
config: UniqueView<ConfigTable>, events: UniqueView<ServerEvents>,
addr_map: UniqueView<ClientAddressMap>, mut chunk_manager: UniqueViewMut<ChunkManager>,
clients: View<Client> task_manager: UniqueView<ChunkTaskManager>,
) { config: UniqueView<ConfigTable>,
for event in &events.0 { addr_map: UniqueView<ClientAddressMap>,
let Some(message) = check_message_auth clients: View<Client>
::<{ClientToServerMessageType::ChunkSubRequest as u8}> ) {
(&server, event, &clients, &addr_map) else { continue }; for event in &events.0 {
let Some(message) = check_message_auth
let ClientToServerMessage::ChunkSubRequest { chunk: chunk_position } = message.message else { ::<{ClientToServerMessageType::ChunkSubRequest as u8}>
unreachable!() (&server, event, &clients, &addr_map) else { continue };
};
let ClientToServerMessage::ChunkSubRequest { chunk: chunk_position } = message.message else {
if let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) { unreachable!()
chunk.subscriptions.insert(message.client_id); };
//TODO Start task here if status is "Nothing"
if let Some(blocks) = &chunk.blocks { if let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) {
send_chunk_compressed( chunk.subscriptions.insert(message.client_id);
message.client, //TODO Start task here if status is "Nothing"
&ServerToClientMessage::ChunkResponse { if let Some(blocks) = &chunk.blocks {
chunk: chunk_position, send_chunk_compressed(
data: blocks.clone(), message.client,
queued: Vec::with_capacity(0) &ServerToClientMessage::ChunkResponse {
} chunk: chunk_position,
).unwrap(); data: blocks.clone(),
} queued: Vec::with_capacity(0)
} else { }
let mut chunk = Chunk::new(chunk_position); ).unwrap();
chunk.state = ChunkState::Loading; }
chunk.subscriptions.insert(message.client_id); } else {
chunk_manager.chunks.insert(chunk_position, chunk); let mut chunk = Chunk::new(chunk_position);
task_manager.spawn_task(ChunkTask::LoadChunk { chunk.state = ChunkState::Loading;
position: chunk_position, chunk.subscriptions.insert(message.client_id);
seed: config.world.seed, chunk_manager.chunks.insert(chunk_position, chunk);
}); task_manager.spawn_task(ChunkTask::LoadChunk {
} position: chunk_position,
} seed: config.world.seed,
} });
}
fn process_finished_tasks( }
server: NonSendSync<UniqueView<UdpServer>>, }
task_manager: UniqueView<ChunkTaskManager>,
mut chunk_manager: UniqueViewMut<ChunkManager>, fn process_finished_tasks(
id_map: UniqueView<ClientIdMap>, server: NonSendSync<UniqueView<UdpServer>>,
client_addr: View<ClientAddress>, task_manager: UniqueView<ChunkTaskManager>,
) { mut chunk_manager: UniqueViewMut<ChunkManager>,
'outer: while let Some(res) = task_manager.receive() { id_map: UniqueView<ClientIdMap>,
let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; client_addr: View<ClientAddress>,
let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { ) {
log::warn!("Chunk discarded: Doesn't exist"); 'outer: while let Some(res) = task_manager.receive() {
continue let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res;
}; let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else {
if chunk.state != ChunkState::Loading { log::warn!("Chunk discarded: Doesn't exist");
log::warn!("Chunk discarded: Not Loading"); continue
continue };
} if chunk.state != ChunkState::Loading {
chunk.state = ChunkState::Loaded; log::warn!("Chunk discarded: Not Loading");
chunk.blocks = Some(blocks.clone()); continue
}
log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); chunk.state = ChunkState::Loaded;
chunk.blocks = Some(blocks.clone());
let chunk_packet = &ServerToClientMessage::ChunkResponse {
chunk: chunk_position, log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len());
data: blocks,
queued: queue let chunk_packet = &ServerToClientMessage::ChunkResponse {
}; chunk: chunk_position,
data: blocks,
for &subscriber in &chunk.subscriptions { queued: queue
let Some(&entity_id) = id_map.0.get(&subscriber) else { };
log::error!("Invalid subscriber client id");
continue 'outer; for &subscriber in &chunk.subscriptions {
}; let Some(&entity_id) = id_map.0.get(&subscriber) else {
let Ok(&ClientAddress(client_addr)) = (&client_addr).get(entity_id) else { log::error!("Invalid subscriber client id");
log::error!("Invalid subscriber entity id"); continue 'outer;
continue 'outer; };
}; let Ok(&ClientAddress(client_addr)) = (&client_addr).get(entity_id) else {
let Some(client) = server.0.client(&client_addr) else { log::error!("Invalid subscriber entity id");
log::error!("Client not connected"); continue 'outer;
continue 'outer; };
}; let Some(client) = server.0.client(&client_addr) else {
send_chunk_compressed(client, chunk_packet).unwrap(); log::error!("Client not connected");
// client.borrow_mut().send( continue 'outer;
// chunk_packet.clone(), };
// CHANNEL_WORLD, send_chunk_compressed(client, chunk_packet).unwrap();
// SendMode::Reliable, // client.borrow_mut().send(
// ); // chunk_packet.clone(),
} // CHANNEL_WORLD,
} // SendMode::Reliable,
} // );
}
fn process_block_queue_messages( }
server: NonSendSync<UniqueView<UdpServer>>, }
events: UniqueView<ServerEvents>,
addr_map: UniqueView<ClientAddressMap>, fn process_block_queue_messages(
clients: View<Client>, server: NonSendSync<UniqueView<UdpServer>>,
addrs: View<ClientAddress>, events: UniqueView<ServerEvents>,
) { addr_map: UniqueView<ClientAddressMap>,
for event in &events.0 { clients: View<Client>,
let Some(message) = check_message_auth addrs: View<ClientAddress>,
::<{ClientToServerMessageType::QueueBlock as u8}> ) {
(&server, event, &clients, &addr_map) else { continue }; for event in &events.0 {
let Some(message) = check_message_auth
let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() }; ::<{ClientToServerMessageType::QueueBlock as u8}>
//TODO place in our own queue, for now just send to other clients (&server, event, &clients, &addr_map) else { continue };
log::info!("Placed block {:?} at {}", item.block_type, item.position);
for (other_client, other_client_address) in (&clients, &addrs).iter() { let ClientToServerMessage::QueueBlock { item } = message.message else { unreachable!() };
//No need to send the event back //TODO place in our own queue, for now just send to other clients
if message.client_id == other_client.0 { log::info!("Placed block {:?} at {}", item.block_type, item.position);
continue for (other_client, other_client_address) in (&clients, &addrs).iter() {
} //No need to send the event back
//Get client if message.client_id == other_client.0 {
let Some(client) = server.0.client(&other_client_address.0) else { continue
log::error!("Client with address not found"); }
continue //Get client
}; let Some(client) = server.0.client(&other_client_address.0) else {
//Send the message log::error!("Client with address not found");
client.borrow_mut().send( continue
postcard::to_allocvec( };
&ServerToClientMessage::QueueBlock { item } //Send the message
).unwrap().into_boxed_slice(), client.borrow_mut().send(
Channel::Block as usize, postcard::to_allocvec(
SendMode::Reliable, &ServerToClientMessage::QueueBlock { item }
); ).unwrap().into_boxed_slice(),
} Channel::Block as usize,
} SendMode::Reliable,
} );
}
fn init_chunk_manager( }
storages: AllStoragesView }
) {
storages.add_unique(ChunkManager::new()); fn init_chunk_manager(
} storages: AllStoragesView
) {
pub fn init_world() -> Workload { storages.add_unique(ChunkManager::new());
( }
init_chunk_manager,
init_chunk_task_manager, pub fn init_world() -> Workload {
).into_workload() (
} init_chunk_manager,
init_chunk_task_manager,
pub fn update_world() -> Workload { ).into_workload()
( }
process_chunk_requests,
process_finished_tasks, pub fn update_world() -> Workload {
process_block_queue_messages, (
).into_workload() process_chunk_requests,
} process_finished_tasks,
process_block_queue_messages,
).into_workload()
}

View file

@ -38,7 +38,8 @@ use player::{
init_client_map, init_client_map,
send_player_movement_events, send_player_movement_events,
receive_player_movement_events, receive_player_movement_events,
receive_player_connect_events receive_player_connect_events,
receive_player_disconnect_events,
}; };
const NET_TICKRATE: u16 = 33; const NET_TICKRATE: u16 = 33;
@ -137,6 +138,7 @@ pub fn update_networking() -> Workload {
( (
( (
receive_player_connect_events, receive_player_connect_events,
receive_player_disconnect_events,
).into_workload(), ).into_workload(),
( (
recv_block_place_events, recv_block_place_events,

View file

@ -99,3 +99,35 @@ pub fn receive_player_connect_events(
spawn_remote_player_multiplayer(&mut storages, init); spawn_remote_player_multiplayer(&mut storages, init);
} }
} }
pub fn receive_player_disconnect_events(
mut storages: AllStoragesViewMut,
) {
let messages: Vec<ServerToClientMessage> = storages.borrow::<View<NetworkEvent>>().unwrap().iter().filter_map(|event| {
let ClientEvent::Receive(data) = &event.0 else {
return None
};
if !event.is_message_of_type::<{ServerToClientMessageType::PlayerDisconnected as u8}>() {
return None
};
let Ok(parsed_message) = postcard::from_bytes(data) else {
log::error!("Malformed message");
return None
};
Some(parsed_message)
}).collect();
for message in messages {
let ServerToClientMessage::PlayerDisconnected { id } = message else { unreachable!() };
log::info!("player disconnected: {}", id);
let mut id_map = storages.borrow::<UniqueViewMut<ClientIdMap>>().unwrap();
let Some(ent_id) = id_map.0.remove(&id) else {
log::warn!("Disconnected player entity not found in client-id map");
continue
};
drop(id_map);
if !storages.delete_entity(ent_id) {
log::warn!("Disconnected player entity not found in storage");
}
}
}