diff --git a/kubi-udp/Cargo.toml b/kubi-udp/Cargo.toml deleted file mode 100644 index 605363e..0000000 --- a/kubi-udp/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "kubi-udp" -version = "0.1.0" -edition = "2021" -publish = false - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bincode = "2.0.0-rc" -anyhow = "1.0" -hashbrown = "0.13" -nohash-hasher = "0.2.0" -log = "0.4" - -[dev-dependencies] -kubi-logging = { path = "../kubi-logging" } diff --git a/kubi-udp/src/client.rs b/kubi-udp/src/client.rs deleted file mode 100644 index 1431542..0000000 --- a/kubi-udp/src/client.rs +++ /dev/null @@ -1,270 +0,0 @@ -use anyhow::{Result, bail}; -use std::{ - net::{UdpSocket, SocketAddr}, - time::{Instant, Duration}, - marker::PhantomData, - collections::{VecDeque, vec_deque::Drain as DrainDeque}, - io::ErrorKind, -}; -use crate::{ - BINCODE_CONFIG, - packet::{ClientPacket, IdClientPacket, IdServerPacket, ServerPacket, Message}, - common::{ClientId, PROTOCOL_ID, DEFAULT_USER_PROTOCOL_ID, PACKET_SIZE} -}; - -#[derive(Default, Clone, Debug)] -#[repr(u8)] -pub enum DisconnectReason { - #[default] - NotConnected, - ClientDisconnected, - KickedByServer(Option), - Timeout, - ConnectionReset, - InvalidProtocolId, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum ClientStatus { - Disconnected, - Connecting, - Connected, -} - -#[derive(Clone, Copy, Debug)] -pub struct ClientConfig { - pub protocol_id: u16, - pub timeout: Duration, - pub heartbeat_interval: Duration, -} -impl Default for ClientConfig { - fn default() -> Self { - Self { - protocol_id: DEFAULT_USER_PROTOCOL_ID, - timeout: Duration::from_secs(5), - heartbeat_interval: Duration::from_secs(3), - } - } -} - -pub enum ClientEvent where T: Message { - Connected(ClientId), - Disconnected(DisconnectReason), - MessageReceived(T) -} - -pub struct Client where S: Message, R: Message { - config: ClientConfig, - addr: SocketAddr, - socket: UdpSocket, - status: ClientStatus, - timeout: Instant, - last_heartbeat: Instant, - client_id: Option, - disconnect_reason: DisconnectReason, - event_queue: VecDeque>, - _s: PhantomData, -} -impl Client where S: Message, R: Message { - #[inline] - pub fn new(addr: SocketAddr, config: ClientConfig) -> Result { - if config.protocol_id == 0 { - log::warn!("Warning: using 0 as protocol_id is not recommended"); - } - if config.protocol_id == DEFAULT_USER_PROTOCOL_ID { - log::warn!("Warning: using default protocol_id is not recommended"); - } - let bind_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let socket = UdpSocket::bind(bind_addr)?; - socket.set_nonblocking(true)?; - Ok(Self { - addr, - config, - socket, - status: ClientStatus::Disconnected, - timeout: Instant::now(), - last_heartbeat: Instant::now(), - client_id: None, - disconnect_reason: DisconnectReason::default(), - event_queue: VecDeque::new(), - _s: PhantomData, - }) - } - - fn send_raw_packet(&self, packet: ClientPacket) -> Result<()> { - let id_packet = IdClientPacket(self.client_id, packet); - let bytes = bincode::encode_to_vec(id_packet, BINCODE_CONFIG)?; - self.socket.send(&bytes)?; - Ok(()) - } - - fn disconnect_inner(&mut self, reason: DisconnectReason, silent: bool) -> Result<()> { - log::info!("client disconnected because {reason:?}"); - if !silent { - self.send_raw_packet(ClientPacket::Disconnect)?; - } - self.client_id = None; - self.status = ClientStatus::Disconnected; - self.disconnect_reason = reason; - self.event_queue.push_back(ClientEvent::Disconnected(self.disconnect_reason.clone())); - Ok(()) - } - - fn reset_timeout(&mut self) { - self.timeout = Instant::now(); - } - - #[inline] - pub fn connect(&mut self) -> Result<()> { - log::info!("Client connecting.."); - if self.status != ClientStatus::Disconnected { - bail!("Not Disconnected"); - } - self.status = ClientStatus::Connecting; - self.last_heartbeat = Instant::now(); - self.reset_timeout(); - self.socket.connect(self.addr)?; - self.send_raw_packet(ClientPacket::Connect{ - user_protocol: self.config.protocol_id, - inner_protocol: PROTOCOL_ID, - })?; - Ok(()) - } - - #[inline] - pub fn disconnect(&mut self) -> Result<()> { - if self.status != ClientStatus::Connected { - bail!("Not Connected"); - } - self.disconnect_inner(DisconnectReason::ClientDisconnected, false)?; - Ok(()) - } - - #[inline] - pub fn set_nonblocking(&mut self, is_nonblocking: bool) -> Result<()> { - self.socket.set_nonblocking(is_nonblocking)?; - Ok(()) - } - - #[inline] - pub fn get_status(&self) -> ClientStatus { - self.status - } - - #[inline] - pub fn is_connected(&self) -> bool { - self.status == ClientStatus::Connected - } - - #[inline] - pub fn is_connecting(&self) -> bool { - self.status == ClientStatus::Connecting - } - - #[inline] - pub fn is_disconnected(&self) -> bool { - self.status == ClientStatus::Disconnected - } - - //Return true if the client has not made any connection attempts yet - #[inline] - pub fn has_not_made_connection_attempts(&self) -> bool { - matches!(self.status, ClientStatus::Disconnected) && - matches!(self.disconnect_reason, DisconnectReason::NotConnected) - } - - #[inline] - pub fn send_message(&self, message: S) -> Result<()> { - if self.status != ClientStatus::Connected { - bail!("Not Connected"); - } - self.send_raw_packet(ClientPacket::Data(message))?; - Ok(()) - } - - #[inline] - pub fn update(&mut self) -> Result<()> { // , callback: fn(ClientEvent) -> Result<()> - if self.status == ClientStatus::Disconnected { - return Ok(()) - } - if self.timeout.elapsed() > self.config.timeout { - log::warn!("Client timed out"); - //We don't care if this packet actually gets sent because the server is likely dead - let _ = self.disconnect_inner(DisconnectReason::Timeout, false).map_err(|_| { - log::warn!("Failed to send disconnect packet"); - }); - return Ok(()) - } - if self.last_heartbeat.elapsed() > self.config.heartbeat_interval { - log::trace!("Sending heartbeat packet"); - self.send_raw_packet(ClientPacket::Heartbeat)?; - self.last_heartbeat = Instant::now(); - } - //receive - let mut buf = [0; PACKET_SIZE]; - loop { - match self.socket.recv(&mut buf) { - Ok(length) => { - //TODO check the first byte of the raw data instead of decoding? - let (packet, _): (IdServerPacket, _) = bincode::decode_from_slice(&buf[..length], BINCODE_CONFIG)?; - let IdServerPacket(user_id, packet) = packet; - if self.client_id.map(|x| Some(x) != user_id).unwrap_or_default() { - return Ok(()) - } - self.reset_timeout(); - match packet { - ServerPacket::Connected(client_id) => { - log::info!("client connected with id {client_id}"); - self.client_id = Some(client_id); - self.status = ClientStatus::Connected; - self.event_queue.push_back(ClientEvent::Connected(client_id)); - return Ok(()) - }, - ServerPacket::Disconnected(reason) => { - log::info!("client kicked: {reason}"); - let reason = DisconnectReason::KickedByServer(Some(reason)); - self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway - return Ok(()) - }, - ServerPacket::Data(message) => { - self.event_queue.push_back(ClientEvent::MessageReceived(message)); - self.timeout = Instant::now(); - }, - ServerPacket::Heartbeat => { - self.timeout = Instant::now(); - }, - ServerPacket::ProtoDisconnect => { - let reason = DisconnectReason::InvalidProtocolId; - self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway - return Ok(()); - } - } - }, - Err(error) if error.kind() != ErrorKind::WouldBlock => { - match error.kind() { - ErrorKind::ConnectionReset => { - log::error!("Connection interrupted"); - self.disconnect_inner(DisconnectReason::ConnectionReset, true)?; - }, - _ => { - log::error!("IO error {error}"); - return Err(error.into()); - }, - } - }, - _ => break, - } - } - Ok(()) - } - - #[inline] - pub fn pop_event(&mut self) -> Option> { - self.event_queue.pop_front() - } - - #[inline] - pub fn process_events(&mut self) -> DrainDeque> { - self.event_queue.drain(..) - } -} diff --git a/kubi-udp/src/common.rs b/kubi-udp/src/common.rs deleted file mode 100644 index be598a7..0000000 --- a/kubi-udp/src/common.rs +++ /dev/null @@ -1,11 +0,0 @@ -use std::num::NonZeroU8; - -pub type ClientId = NonZeroU8; -pub type ClientIdRepr = u8; - -pub const MAX_CLIENTS: usize = u8::MAX as _; - -pub const PROTOCOL_ID: u16 = 1; -pub const DEFAULT_USER_PROTOCOL_ID: u16 = 0xffff; - -pub const PACKET_SIZE: usize = u16::MAX as usize; diff --git a/kubi-udp/src/lib.rs b/kubi-udp/src/lib.rs deleted file mode 100644 index 82588e3..0000000 --- a/kubi-udp/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -pub mod client; -pub mod server; -pub(crate) mod packet; -pub(crate) mod common; -pub use common::ClientId; -pub use common::ClientIdRepr; -pub use common::MAX_CLIENTS; - -//pub(crate) trait Serializable: bincode::Encode + bincode::Decode {} -pub(crate) const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard() - .with_little_endian() - .with_variable_int_encoding() - .skip_fixed_array_length(); diff --git a/kubi-udp/src/packet.rs b/kubi-udp/src/packet.rs deleted file mode 100644 index b5eff11..0000000 --- a/kubi-udp/src/packet.rs +++ /dev/null @@ -1,33 +0,0 @@ -use bincode::{Encode, Decode}; -use crate::common::ClientId; - -pub trait Message: Encode + Decode + Clone {} -impl Message for T {} - -#[repr(u8)] -#[derive(Encode, Decode)] -pub enum ClientPacket where T: Message { - Connect { - inner_protocol: u16, - user_protocol: u16, - }, //should always stay 0! - Data(T), - Disconnect, - Heartbeat, -} - -#[derive(Encode, Decode)] -pub struct IdClientPacket(pub Option, pub ClientPacket); - -#[repr(u8)] -#[derive(Encode, Decode)] -pub enum ServerPacket where T: Message { - ProtoDisconnect = 0, - Data(T), - Disconnected(String), - Connected(ClientId), - Heartbeat, -} - -#[derive(Encode, Decode)] -pub struct IdServerPacket(pub Option, pub ServerPacket); diff --git a/kubi-udp/src/server.rs b/kubi-udp/src/server.rs deleted file mode 100644 index 337f292..0000000 --- a/kubi-udp/src/server.rs +++ /dev/null @@ -1,266 +0,0 @@ -use std::{ - net::{UdpSocket, SocketAddr}, - time::{Instant, Duration}, - marker::PhantomData, - collections::{VecDeque, vec_deque::Drain as DrainDeque}, - io::ErrorKind -}; -use anyhow::{Result, Error, bail}; -use hashbrown::HashMap; -use nohash_hasher::BuildNoHashHasher; -use crate::{ - BINCODE_CONFIG, - common::{ClientId, ClientIdRepr, MAX_CLIENTS, PROTOCOL_ID, DEFAULT_USER_PROTOCOL_ID, PACKET_SIZE}, - packet::{IdClientPacket, ClientPacket, ServerPacket, IdServerPacket, Message} -}; - -//i was feeling a bit sick while writing most of this please excuse me for my terrible code :3 - -pub struct ConnectedClient { - id: ClientId, - addr: SocketAddr, - timeout: Instant, -} - -#[derive(Clone, Copy, Debug)] -pub struct ServerConfig { - pub max_clients: usize, - pub client_timeout: Duration, - pub protocol_id: u16, -} -impl Default for ServerConfig { - fn default() -> Self { - Self { - max_clients: MAX_CLIENTS, - client_timeout: Duration::from_secs(5), - protocol_id: DEFAULT_USER_PROTOCOL_ID, - } - } -} - -pub enum ServerEvent where T: Message { - Connected(ClientId), - Disconnected(ClientId), - MessageReceived { - from: ClientId, - message: T - } -} - -pub struct Server where S: Message, R: Message { - socket: UdpSocket, - clients: HashMap>, - config: ServerConfig, - event_queue: VecDeque>, - _s: PhantomData, -} -impl Server where S: Message, R: Message { - pub fn bind(addr: SocketAddr, config: ServerConfig) -> anyhow::Result { - assert!(config.max_clients <= MAX_CLIENTS, "max_clients value exceeds the maximum allowed amount of clients"); - if config.protocol_id == 0 { - log::warn!("Warning: using 0 as protocol_id is not recommended"); - } - if config.protocol_id == DEFAULT_USER_PROTOCOL_ID { - log::warn!("Warning: using default protocol_id is not recommended"); - } - let socket = UdpSocket::bind(addr)?; - socket.set_nonblocking(true)?; - Ok(Self { - config, - socket, - clients: HashMap::with_capacity_and_hasher(MAX_CLIENTS, BuildNoHashHasher::default()), - event_queue: VecDeque::new(), - _s: PhantomData, - }) - } - - - fn send_to_addr_inner(socket: &UdpSocket, addr: SocketAddr, packet: IdServerPacket) -> Result<()> { - let bytes = bincode::encode_to_vec(packet, BINCODE_CONFIG)?; - socket.send_to(&bytes, addr)?; - Ok(()) - } - - fn send_to_addr(&self, addr: SocketAddr, packet: IdServerPacket) -> Result<()> { - Self::send_to_addr_inner(&self.socket, addr, packet) - } - - fn send_packet(&self, packet: IdServerPacket) -> Result<()> { - let Some(id) = packet.0 else { - bail!("send_to_client call without id") - }; - let Some(client) = self.clients.get(&id) else { - bail!("client with id {id} doesn't exist") - }; - self.send_to_addr(client.addr, packet)?; - Ok(()) - } - - fn add_client(&mut self, addr: SocketAddr) -> Result { - let Some(id) = (1..=self.config.max_clients) - .map(|x| ClientId::new(x as _).unwrap()) - .find(|i| !self.clients.contains_key(i)) else { - bail!("Server full"); - }; - if self.clients.iter().any(|x| x.1.addr == addr) { - bail!("Already connected from the same address"); - } - self.clients.insert(id, ConnectedClient { - id, - addr, - timeout: Instant::now(), - }); - Ok(id) - } - - fn disconnect_client_inner(&mut self, id: ClientId, reason: String) -> Result<()> { - let result = self.send_packet(IdServerPacket( - Some(id), ServerPacket::Disconnected(reason) - )); - self.clients.remove(&id); - result - } - - pub fn kick_client(&mut self, id: ClientId, reason: String) -> Result<()> { - if !self.clients.contains_key(&id) { - bail!("Already disconnected") - } - self.disconnect_client_inner(id, reason)?; - Ok(()) - } - - pub fn shutdown(mut self) -> Result<()> { - let clients = self.clients.keys().copied().collect::>(); - for id in clients { - self.kick_client(id, "Server is shutting down".into())?; - } - Ok(()) - } - - pub fn send_message(&mut self, id: ClientId, message: S) -> Result<()> { - self.send_packet(IdServerPacket(Some(id), ServerPacket::Data(message)))?; - Ok(()) - } - pub fn multicast_message(&mut self, clients: impl IntoIterator, message: S) -> Vec { - //TODO use actual udp multicast - let mut errors = Vec::with_capacity(0); - for client in clients { - if let Err(error) = self.send_message(client, message.clone()) { - log::error!("Message broadcast failed for id {client}"); - errors.push(error); - } - } - errors - } - pub fn broadcast_message(&mut self, message: S) -> Vec { - let ids = self.clients.keys().copied().collect::>(); - self.multicast_message(ids, message) - } - pub fn broadcast_message_except(&mut self, except: ClientId, message: S) -> Vec { - let ids = self.clients.keys().copied().filter(|&k| k != except).collect::>(); - self.multicast_message(ids, message) - } - - pub fn update(&mut self) -> Result<()> { - //kick inactive clients - self.clients.retain(|&id, client| { - if client.timeout.elapsed() > self.config.client_timeout { - if let Err(_) = Self::send_to_addr_inner(&self.socket, client.addr, IdServerPacket( - Some(id), ServerPacket::Disconnected("Timed out".into()) - )) { - log::warn!("Client {id} timed out and we failed to send the kick packet. This shouldn't reaally matter") - } else { - log::info!("Client {id} timed out"); - } - return false - } - true - }); - - let mut buf = [0; PACKET_SIZE]; - loop { - match self.socket.recv_from(&mut buf) { - Ok((len, addr)) => { - if let Ok(packet) = bincode::decode_from_slice(&buf[..len], BINCODE_CONFIG) { - let (packet, _): (IdClientPacket, _) = packet; - let IdClientPacket(id, packet) = packet; - match id { - Some(id) => { - if !self.clients.contains_key(&id) { - bail!("Client with id {id} doesn't exist"); - } - if self.clients.get(&id).unwrap().addr != addr { - bail!("Client addr doesn't match"); - } - match packet { - ClientPacket::Data(data) => { - self.event_queue.push_back(ServerEvent::MessageReceived { - from: id, - message: data, - }); - } - ClientPacket::Disconnect => { - log::info!("Client {id} disconnected"); - self.event_queue.push_back(ServerEvent::Disconnected(id)); - self.disconnect_client_inner(id, "Disconnected".into())?; - }, - ClientPacket::Heartbeat => { - self.clients.get_mut(&id).unwrap().timeout = Instant::now(); - self.send_packet(IdServerPacket(Some(id), ServerPacket::Heartbeat))?; - }, - ClientPacket::Connect{..} => bail!("Client already connected"), - } - }, - None => { - match packet { - ClientPacket::Connect { user_protocol, inner_protocol } => { - if (inner_protocol != PROTOCOL_ID) || (user_protocol != self.config.protocol_id ) { - log::error!("Client conenction refused: Invalid protocol id"); - self.send_to_addr(addr, - IdServerPacket(None, ServerPacket::ProtoDisconnect) - )?; - continue; - } - - match self.add_client(addr) { - Ok(id) => { - log::info!("Client with id {id} connected"); - self.event_queue.push_back(ServerEvent::Connected(id)); - self.send_to_addr(addr, - IdServerPacket(None, ServerPacket::Connected(id)) - )?; - }, - Err(error) => { - let reason = error.to_string(); - log::error!("Client connection failed: {reason}"); - self.send_to_addr(addr, IdServerPacket( - None, ServerPacket::Disconnected(reason) - ))?; - } - } - }, - _ => bail!("Invalid packet type for non-id packet") - } - } - } - } else { - bail!("Corrupted packet received"); - } - }, - Err(error) if error.kind() != ErrorKind::WouldBlock => { - log::error!("IO error {}", error); - // return Err(error.into()); - }, - _ => break, - } - } - Ok(()) - } - - pub fn pop_event(&mut self) -> Option> { - self.event_queue.pop_front() - } - pub fn process_events(&mut self) -> DrainDeque> { - self.event_queue.drain(..) - } -} diff --git a/kubi-udp/tests/test.rs b/kubi-udp/tests/test.rs deleted file mode 100644 index 7deb52c..0000000 --- a/kubi-udp/tests/test.rs +++ /dev/null @@ -1,93 +0,0 @@ -use kubi_udp::{ - server::{Server, ServerConfig, ServerEvent}, - client::{Client, ClientConfig, ClientEvent}, -}; -use std::{thread, time::Duration}; - -const TEST_ADDR: &str = "127.0.0.1:22342"; - -type CtsMessage = u32; -type StcMessage = u64; - -const CTS_MSG: CtsMessage = 0xbeef_face; -const STC_MSG: StcMessage = 0xdead_beef_cafe_face; - -#[test] -fn test_connection() { - //Init logging - kubi_logging::init(); - - //Create server and client - let mut server: Server = Server::bind( - TEST_ADDR.parse().expect("Invalid TEST_ADDR"), - ServerConfig::default() - ).expect("Failed to create server"); - let mut client: Client = Client::new( - TEST_ADDR.parse().unwrap(), - ClientConfig::default() - ).expect("Failed to create client"); - - //Start server update thread - let server_handle = thread::spawn(move || { - let mut message_received = false; - loop { - server.update().unwrap(); - let events: Vec<_> = server.process_events().collect(); - for event in events { - match event { - ServerEvent::Connected(id) => { - assert_eq!(id.get(), 1, "Unexpected client id"); - server.send_message(id, STC_MSG).unwrap(); - }, - ServerEvent::Disconnected(id) => { - assert!(message_received, "Client {id} disconnected from the server before sending the message"); - return; - }, - ServerEvent::MessageReceived { from, message } => { - log::info!("server received message"); - assert_eq!(message, CTS_MSG, "Received message not equal"); - message_received = true; - }, - _ => () - } - } - } - }); - - //Wait a bit - thread::sleep(Duration::from_secs(1)); - - //Connect client - client.connect().expect("Client connect failed"); - - //Start updating the client - let client_handle = thread::spawn(move || { - let mut message_received = false; - loop { - client.update().unwrap(); - let events: Vec<_> = client.process_events().collect(); - for event in events { - match event { - ClientEvent::Connected(id) => { - assert_eq!(id.get(), 1, "Unexpected client id"); - client.send_message(CTS_MSG).unwrap(); - }, - ClientEvent::Disconnected(reason) => { - assert!(message_received, "Client lost connection to the server before sending the message with reason: {reason:?}"); - return; - }, - ClientEvent::MessageReceived(data) => { - log::info!("client received message"); - assert_eq!(data, STC_MSG, "Received message not equal"); - message_received = true; - client.disconnect().unwrap(); - }, - _ => () - } - } - } - }); - - server_handle.join().unwrap(); - client_handle.join().unwrap(); -}