diff --git a/kubi-udp/src/client.rs b/kubi-udp/src/client.rs index 31f5d17..07ab09c 100644 --- a/kubi-udp/src/client.rs +++ b/kubi-udp/src/client.rs @@ -51,7 +51,7 @@ pub enum ClientEvent where T: Encode + Decode { } pub struct Client where S: Encode + Decode, R: Encode + Decode { - pub config: ClientConfig, + config: ClientConfig, addr: SocketAddr, socket: UdpSocket, status: ClientStatus, @@ -125,6 +125,28 @@ impl Client where S: Encode + Decode, R: Encode + Decode { Ok(()) } + pub fn get_status(&self) -> ClientStatus { + self.status + } + + pub fn is_connected(&self) -> bool { + self.status == ClientStatus::Connected + } + + pub fn is_connecting(&self) -> bool { + self.status == ClientStatus::Connecting + } + + pub fn is_disconnected(&self) -> bool { + self.status == ClientStatus::Disconnected + } + + //Return true if the client has not made any connection attempts yet + pub fn has_not_made_connection_attempts(&self) -> bool { + matches!(self.status, ClientStatus::Disconnected) && + matches!(self.disconnect_reason, DisconnectReason::NotConnected) + } + pub fn send_message(&self, message: S) -> Result<()> { if self.status != ClientStatus::Connected { bail!("Not Connected"); @@ -152,43 +174,45 @@ impl Client where S: Encode + Decode, R: Encode + Decode { } //receive let mut buf = [0; u16::MAX as usize]; - match self.socket.recv(&mut buf) { - Ok(length) => { - //TODO check the first byte of the raw data instead of decoding? - let (packet, _): (IdServerPacket, _) = bincode::decode_from_slice(&buf[..length], BINCODE_CONFIG)?; - let IdServerPacket(user_id, packet) = packet; - if self.client_id.map(|x| Some(x) != user_id).unwrap_or_default() { - return Ok(()) - } - self.reset_timeout(); - match packet { - ServerPacket::Connected(client_id) => { - log::info!("client connected with id {client_id}"); - self.client_id = Some(client_id); - self.status = ClientStatus::Connected; - self.event_queue.push_back(ClientEvent::Connected(client_id)); + loop { + match self.socket.recv(&mut buf) { + Ok(length) => { + //TODO check the first byte of the raw data instead of decoding? + let (packet, _): (IdServerPacket, _) = bincode::decode_from_slice(&buf[..length], BINCODE_CONFIG)?; + let IdServerPacket(user_id, packet) = packet; + if self.client_id.map(|x| Some(x) != user_id).unwrap_or_default() { return Ok(()) - }, - ServerPacket::Disconnected(reason) => { - log::info!("client kicked: {reason}"); - let reason = DisconnectReason::KickedByServer(Some(reason)); - self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway - return Ok(()) - }, - ServerPacket::Data(message) => { - self.event_queue.push_back(ClientEvent::MessageReceived(message)); } - } - }, - Err(error) if error.kind() != ErrorKind::WouldBlock => { - return Err(error.into()); - }, - _ => (), + self.reset_timeout(); + match packet { + ServerPacket::Connected(client_id) => { + log::info!("client connected with id {client_id}"); + self.client_id = Some(client_id); + self.status = ClientStatus::Connected; + self.event_queue.push_back(ClientEvent::Connected(client_id)); + return Ok(()) + }, + ServerPacket::Disconnected(reason) => { + log::info!("client kicked: {reason}"); + let reason = DisconnectReason::KickedByServer(Some(reason)); + self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway + return Ok(()) + }, + ServerPacket::Data(message) => { + self.event_queue.push_back(ClientEvent::MessageReceived(message)); + } + } + }, + Err(error) if error.kind() != ErrorKind::WouldBlock => { + return Err(error.into()); + }, + _ => break, + } } Ok(()) } - pub fn get_event(&mut self) -> Option> { + pub fn pop_event(&mut self) -> Option> { self.event_queue.pop_front() } pub fn process_events(&mut self) -> DrainDeque> { diff --git a/kubi-udp/src/server.rs b/kubi-udp/src/server.rs index 0296317..5d135a0 100644 --- a/kubi-udp/src/server.rs +++ b/kubi-udp/src/server.rs @@ -52,11 +52,25 @@ pub struct Server where S: Encode + Decode, R: Encode + Decode { _s: PhantomData, } impl Server where S: Encode + Decode, R: Encode + Decode { + pub fn bind(addr: SocketAddr, config: ServerConfig) -> anyhow::Result { + assert!(config.max_clients <= MAX_CLIENTS); + let socket = UdpSocket::bind(addr)?; + socket.set_nonblocking(true)?; + Ok(Self { + config, + socket, + clients: HashMap::with_capacity_and_hasher(MAX_CLIENTS, BuildNoHashHasher::default()), + event_queue: VecDeque::new(), + _s: PhantomData, + }) + } + fn send_to_addr(&self, addr: SocketAddr, packet: IdServerPacket) -> Result<()> { let bytes = bincode::encode_to_vec(packet, BINCODE_CONFIG)?; self.socket.send_to(&bytes, addr)?; Ok(()) } + fn send_packet(&self, packet: IdServerPacket) -> Result<()> { let Some(id) = packet.0 else { bail!("send_to_client call without id") @@ -67,6 +81,7 @@ impl Server where S: Encode + Decode, R: Encode + Decode { self.send_to_addr(client.addr, packet)?; Ok(()) } + fn add_client(&mut self, addr: SocketAddr) -> Result { let Some(id) = (1..=self.config.max_clients) .map(|x| ClientId::new(x as _).unwrap()) @@ -83,6 +98,7 @@ impl Server where S: Encode + Decode, R: Encode + Decode { }); Ok(id) } + fn disconnect_client_inner(&mut self, id: ClientId, reason: String) -> Result<()> { let result = self.send_packet(IdServerPacket( Some(id), ServerPacket::Disconnected(reason) @@ -98,6 +114,7 @@ impl Server where S: Encode + Decode, R: Encode + Decode { self.disconnect_client_inner(id, reason)?; Ok(()) } + pub fn shutdown(mut self) -> Result<()> { let clients = self.clients.keys().copied().collect::>(); for id in clients { @@ -105,90 +122,82 @@ impl Server where S: Encode + Decode, R: Encode + Decode { } Ok(()) } + pub fn send_message(&mut self, id: ClientId, message: S) -> anyhow::Result<()> { self.send_packet(IdServerPacket(Some(id), ServerPacket::Data(message)))?; Ok(()) } - pub fn bind(addr: SocketAddr, config: ServerConfig) -> anyhow::Result { - assert!(config.max_clients <= MAX_CLIENTS); - let socket = UdpSocket::bind(addr)?; - socket.set_nonblocking(true)?; - Ok(Self { - config, - socket, - clients: HashMap::with_capacity_and_hasher(MAX_CLIENTS, BuildNoHashHasher::default()), - event_queue: VecDeque::new(), - _s: PhantomData, - }) - } + pub fn update(&mut self) -> Result<()> { //TODO client timeout let mut buf = [0; u16::MAX as usize]; - match self.socket.recv_from(&mut buf) { - Ok((len, addr)) => { - if let Ok(packet) = bincode::decode_from_slice(&buf[..len], BINCODE_CONFIG) { - let (packet, _): (IdClientPacket, _) = packet; - let IdClientPacket(id, packet) = packet; - match id { - Some(id) => { - if !self.clients.contains_key(&id) { - bail!("Client with id {id} doesn't exist"); - }; - match packet { - ClientPacket::Data(data) => { - self.event_queue.push_back(ServerEvent::MessageReceived { - from: id, - message: data, - }); - } - ClientPacket::Disconnect => { - log::info!("Client {id} disconnected"); - self.event_queue.push_back(ServerEvent::Disconnected(id)); - self.disconnect_client_inner(id, "Disconnected".into())?; - }, - ClientPacket::Heartbeat => { - self.clients.get_mut(&id).unwrap().timeout = Instant::now() - }, - ClientPacket::Connect => bail!("Client already connected"), - } - }, - None => { - match packet { - ClientPacket::Connect => { - match self.add_client(addr) { - Ok(id) => { - log::info!("Client with id {id} connected"); - self.event_queue.push_back(ServerEvent::Connected(id)); - self.send_to_addr(addr, - IdServerPacket(None, ServerPacket::Connected(id) - ))?; - }, - Err(error) => { - let reason = error.to_string(); - log::error!("Client connection failed: {reason}"); - self.send_to_addr(addr, IdServerPacket( - None, ServerPacket::Disconnected(reason) - ))?; - } + loop { + match self.socket.recv_from(&mut buf) { + Ok((len, addr)) => { + if let Ok(packet) = bincode::decode_from_slice(&buf[..len], BINCODE_CONFIG) { + let (packet, _): (IdClientPacket, _) = packet; + let IdClientPacket(id, packet) = packet; + match id { + Some(id) => { + if !self.clients.contains_key(&id) { + bail!("Client with id {id} doesn't exist"); + }; + match packet { + ClientPacket::Data(data) => { + self.event_queue.push_back(ServerEvent::MessageReceived { + from: id, + message: data, + }); } - }, - _ => bail!("Invalid packet type for non-id packet") + ClientPacket::Disconnect => { + log::info!("Client {id} disconnected"); + self.event_queue.push_back(ServerEvent::Disconnected(id)); + self.disconnect_client_inner(id, "Disconnected".into())?; + }, + ClientPacket::Heartbeat => { + self.clients.get_mut(&id).unwrap().timeout = Instant::now() + }, + ClientPacket::Connect => bail!("Client already connected"), + } + }, + None => { + match packet { + ClientPacket::Connect => { + match self.add_client(addr) { + Ok(id) => { + log::info!("Client with id {id} connected"); + self.event_queue.push_back(ServerEvent::Connected(id)); + self.send_to_addr(addr, + IdServerPacket(None, ServerPacket::Connected(id) + ))?; + }, + Err(error) => { + let reason = error.to_string(); + log::error!("Client connection failed: {reason}"); + self.send_to_addr(addr, IdServerPacket( + None, ServerPacket::Disconnected(reason) + ))?; + } + } + }, + _ => bail!("Invalid packet type for non-id packet") + } } } + } else { + bail!("Corrupted packet received"); } - } else { - bail!("Corrupted packet received"); - } - }, - Err(error) if error.kind() != ErrorKind::WouldBlock => { - return Err(error.into()); - }, - _ => (), + }, + Err(error) if error.kind() != ErrorKind::WouldBlock => { + return Err(error.into()); + }, + _ => break, + } } Ok(()) } - pub fn get_event(&mut self) -> Option> { + pub fn pop_event(&mut self) -> Option> { self.event_queue.pop_front() } pub fn process_events(&mut self) -> DrainDeque> { diff --git a/kubi/src/main.rs b/kubi/src/main.rs index 924d481..3ddf83d 100644 --- a/kubi/src/main.rs +++ b/kubi/src/main.rs @@ -63,16 +63,19 @@ use rendering::{ RenderTarget, BackgroundColor, clear_background, + init_window_size, + update_window_size, primitives::init_primitives, selection_box::render_selection_box, world::draw_world, - world::draw_current_chunk_border, init_window_size, update_window_size, + world::draw_current_chunk_border, }; use block_placement::block_placement_system; use delta_time::{DeltaTime, init_delta_time}; use cursor_lock::{insert_lock_state, update_cursor_lock_state, lock_cursor_now}; use control_flow::{exit_on_esc, insert_control_flow_unique, SetControlFlow}; use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state}; +use networking::{update_networking, is_multiplayer}; use init::initialize_from_args; use gui::{render_gui, init_gui, update_gui}; use loading_screen::update_loading_screen; @@ -102,6 +105,9 @@ fn update() -> Workload { update_cursor_lock_state, process_inputs, exit_on_esc, + ( + update_networking + ).into_workload().run_if(is_multiplayer), ( update_loading_screen, ).into_workload().run_if(is_loading), @@ -114,8 +120,8 @@ fn update() -> Workload { update_raycasts, block_placement_system, apply_queued_blocks, + compute_cameras, ).into_workload().run_if(is_ingame), - compute_cameras, update_gui, update_state, ).into_workload() diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 136cd49..3254bc4 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -1,11 +1,13 @@ -use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, WorkloadModificator}; +use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, WorkloadModificator, EntitiesView, EntitiesViewMut, Component, ViewMut, SystemModificator}; use std::net::SocketAddr; -use kubi_udp::client::{Client, ClientConfig}; +use kubi_udp::client::{Client, ClientConfig, ClientEvent}; use kubi_shared::networking::{ messages::{ClientToServerMessage, ServerToClientMessage}, state::ClientJoinState }; +use crate::events::EventComponent; + #[derive(Unique, Clone, Copy, PartialEq, Eq)] pub enum GameType { Singleplayer, @@ -18,6 +20,9 @@ pub struct ServerAddress(pub SocketAddr); #[derive(Unique)] pub struct UdpClient(pub Client); +#[derive(Component)] +pub struct NetworkEvent(pub ClientEvent); + pub fn create_client( storages: AllStoragesView ) { @@ -29,34 +34,47 @@ pub fn create_client( storages.add_unique(ClientJoinState::Disconnected); } -pub fn client_connect( +pub fn connect_client( mut client: UniqueViewMut ) { client.0.connect().unwrap(); } -pub fn update_client_and_get_events( +pub fn update_client( mut client: UniqueViewMut, ) { client.0.update().unwrap(); - for event in client.0.process_events() { - todo!() - } } -pub fn init_networking() -> Workload { - ( - create_client, - client_connect, - ).into_workload().run_if(is_multiplayer) +pub fn insert_client_events( + mut client: UniqueViewMut, + mut entities: EntitiesViewMut, + mut events: ViewMut, + mut network_events: ViewMut, +) { + entities.bulk_add_entity(( + &mut events, + &mut network_events, + ), client.0.process_events().map(|event| { + (EventComponent, NetworkEvent(event)) + })); } pub fn update_networking() -> Workload { ( - update_client_and_get_events, + create_client.run_if_missing_unique::(), + connect_client.run_if(client_needs_connect_call), + update_client, + insert_client_events, ).into_workload().run_if(is_multiplayer) } +fn client_needs_connect_call( + client: UniqueView, +) -> bool { + client.0.has_not_made_connection_attempts() +} + pub fn is_multiplayer( game_type: UniqueView ) -> bool {