client networking calls

This commit is contained in:
griffi-gh 2023-02-12 20:37:06 +01:00
parent bcd3066c95
commit 8758938e7e
4 changed files with 173 additions and 116 deletions

View file

@ -51,7 +51,7 @@ pub enum ClientEvent<T> where T: Encode + Decode {
} }
pub struct Client<S, R> where S: Encode + Decode, R: Encode + Decode { pub struct Client<S, R> where S: Encode + Decode, R: Encode + Decode {
pub config: ClientConfig, config: ClientConfig,
addr: SocketAddr, addr: SocketAddr,
socket: UdpSocket, socket: UdpSocket,
status: ClientStatus, status: ClientStatus,
@ -125,6 +125,28 @@ impl<S, R> Client<S, R> where S: Encode + Decode, R: Encode + Decode {
Ok(()) Ok(())
} }
pub fn get_status(&self) -> ClientStatus {
self.status
}
pub fn is_connected(&self) -> bool {
self.status == ClientStatus::Connected
}
pub fn is_connecting(&self) -> bool {
self.status == ClientStatus::Connecting
}
pub fn is_disconnected(&self) -> bool {
self.status == ClientStatus::Disconnected
}
//Return true if the client has not made any connection attempts yet
pub fn has_not_made_connection_attempts(&self) -> bool {
matches!(self.status, ClientStatus::Disconnected) &&
matches!(self.disconnect_reason, DisconnectReason::NotConnected)
}
pub fn send_message(&self, message: S) -> Result<()> { pub fn send_message(&self, message: S) -> Result<()> {
if self.status != ClientStatus::Connected { if self.status != ClientStatus::Connected {
bail!("Not Connected"); bail!("Not Connected");
@ -152,43 +174,45 @@ impl<S, R> Client<S, R> where S: Encode + Decode, R: Encode + Decode {
} }
//receive //receive
let mut buf = [0; u16::MAX as usize]; let mut buf = [0; u16::MAX as usize];
match self.socket.recv(&mut buf) { loop {
Ok(length) => { match self.socket.recv(&mut buf) {
//TODO check the first byte of the raw data instead of decoding? Ok(length) => {
let (packet, _): (IdServerPacket<R>, _) = bincode::decode_from_slice(&buf[..length], BINCODE_CONFIG)?; //TODO check the first byte of the raw data instead of decoding?
let IdServerPacket(user_id, packet) = packet; let (packet, _): (IdServerPacket<R>, _) = bincode::decode_from_slice(&buf[..length], BINCODE_CONFIG)?;
if self.client_id.map(|x| Some(x) != user_id).unwrap_or_default() { let IdServerPacket(user_id, packet) = packet;
return Ok(()) if self.client_id.map(|x| Some(x) != user_id).unwrap_or_default() {
}
self.reset_timeout();
match packet {
ServerPacket::Connected(client_id) => {
log::info!("client connected with id {client_id}");
self.client_id = Some(client_id);
self.status = ClientStatus::Connected;
self.event_queue.push_back(ClientEvent::Connected(client_id));
return Ok(()) return Ok(())
},
ServerPacket::Disconnected(reason) => {
log::info!("client kicked: {reason}");
let reason = DisconnectReason::KickedByServer(Some(reason));
self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway
return Ok(())
},
ServerPacket::Data(message) => {
self.event_queue.push_back(ClientEvent::MessageReceived(message));
} }
} self.reset_timeout();
}, match packet {
Err(error) if error.kind() != ErrorKind::WouldBlock => { ServerPacket::Connected(client_id) => {
return Err(error.into()); log::info!("client connected with id {client_id}");
}, self.client_id = Some(client_id);
_ => (), self.status = ClientStatus::Connected;
self.event_queue.push_back(ClientEvent::Connected(client_id));
return Ok(())
},
ServerPacket::Disconnected(reason) => {
log::info!("client kicked: {reason}");
let reason = DisconnectReason::KickedByServer(Some(reason));
self.disconnect_inner(reason, true)?; //this should never fail but we're handling the error anyway
return Ok(())
},
ServerPacket::Data(message) => {
self.event_queue.push_back(ClientEvent::MessageReceived(message));
}
}
},
Err(error) if error.kind() != ErrorKind::WouldBlock => {
return Err(error.into());
},
_ => break,
}
} }
Ok(()) Ok(())
} }
pub fn get_event(&mut self) -> Option<ClientEvent<R>> { pub fn pop_event(&mut self) -> Option<ClientEvent<R>> {
self.event_queue.pop_front() self.event_queue.pop_front()
} }
pub fn process_events(&mut self) -> DrainDeque<ClientEvent<R>> { pub fn process_events(&mut self) -> DrainDeque<ClientEvent<R>> {

View file

@ -52,11 +52,25 @@ pub struct Server<S, R> where S: Encode + Decode, R: Encode + Decode {
_s: PhantomData<S>, _s: PhantomData<S>,
} }
impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode { impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode {
pub fn bind(addr: SocketAddr, config: ServerConfig) -> anyhow::Result<Self> {
assert!(config.max_clients <= MAX_CLIENTS);
let socket = UdpSocket::bind(addr)?;
socket.set_nonblocking(true)?;
Ok(Self {
config,
socket,
clients: HashMap::with_capacity_and_hasher(MAX_CLIENTS, BuildNoHashHasher::default()),
event_queue: VecDeque::new(),
_s: PhantomData,
})
}
fn send_to_addr(&self, addr: SocketAddr, packet: IdServerPacket<S>) -> Result<()> { fn send_to_addr(&self, addr: SocketAddr, packet: IdServerPacket<S>) -> Result<()> {
let bytes = bincode::encode_to_vec(packet, BINCODE_CONFIG)?; let bytes = bincode::encode_to_vec(packet, BINCODE_CONFIG)?;
self.socket.send_to(&bytes, addr)?; self.socket.send_to(&bytes, addr)?;
Ok(()) Ok(())
} }
fn send_packet(&self, packet: IdServerPacket<S>) -> Result<()> { fn send_packet(&self, packet: IdServerPacket<S>) -> Result<()> {
let Some(id) = packet.0 else { let Some(id) = packet.0 else {
bail!("send_to_client call without id") bail!("send_to_client call without id")
@ -67,6 +81,7 @@ impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode {
self.send_to_addr(client.addr, packet)?; self.send_to_addr(client.addr, packet)?;
Ok(()) Ok(())
} }
fn add_client(&mut self, addr: SocketAddr) -> Result<ClientId> { fn add_client(&mut self, addr: SocketAddr) -> Result<ClientId> {
let Some(id) = (1..=self.config.max_clients) let Some(id) = (1..=self.config.max_clients)
.map(|x| ClientId::new(x as _).unwrap()) .map(|x| ClientId::new(x as _).unwrap())
@ -83,6 +98,7 @@ impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode {
}); });
Ok(id) Ok(id)
} }
fn disconnect_client_inner(&mut self, id: ClientId, reason: String) -> Result<()> { fn disconnect_client_inner(&mut self, id: ClientId, reason: String) -> Result<()> {
let result = self.send_packet(IdServerPacket( let result = self.send_packet(IdServerPacket(
Some(id), ServerPacket::Disconnected(reason) Some(id), ServerPacket::Disconnected(reason)
@ -98,6 +114,7 @@ impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode {
self.disconnect_client_inner(id, reason)?; self.disconnect_client_inner(id, reason)?;
Ok(()) Ok(())
} }
pub fn shutdown(mut self) -> Result<()> { pub fn shutdown(mut self) -> Result<()> {
let clients = self.clients.keys().copied().collect::<Vec<ClientId>>(); let clients = self.clients.keys().copied().collect::<Vec<ClientId>>();
for id in clients { for id in clients {
@ -105,90 +122,82 @@ impl<S, R> Server<S, R> where S: Encode + Decode, R: Encode + Decode {
} }
Ok(()) Ok(())
} }
pub fn send_message(&mut self, id: ClientId, message: S) -> anyhow::Result<()> { pub fn send_message(&mut self, id: ClientId, message: S) -> anyhow::Result<()> {
self.send_packet(IdServerPacket(Some(id), ServerPacket::Data(message)))?; self.send_packet(IdServerPacket(Some(id), ServerPacket::Data(message)))?;
Ok(()) Ok(())
} }
pub fn bind(addr: SocketAddr, config: ServerConfig) -> anyhow::Result<Self> {
assert!(config.max_clients <= MAX_CLIENTS);
let socket = UdpSocket::bind(addr)?;
socket.set_nonblocking(true)?;
Ok(Self {
config,
socket,
clients: HashMap::with_capacity_and_hasher(MAX_CLIENTS, BuildNoHashHasher::default()),
event_queue: VecDeque::new(),
_s: PhantomData,
})
}
pub fn update(&mut self) -> Result<()> { pub fn update(&mut self) -> Result<()> {
//TODO client timeout //TODO client timeout
let mut buf = [0; u16::MAX as usize]; let mut buf = [0; u16::MAX as usize];
match self.socket.recv_from(&mut buf) { loop {
Ok((len, addr)) => { match self.socket.recv_from(&mut buf) {
if let Ok(packet) = bincode::decode_from_slice(&buf[..len], BINCODE_CONFIG) { Ok((len, addr)) => {
let (packet, _): (IdClientPacket<R>, _) = packet; if let Ok(packet) = bincode::decode_from_slice(&buf[..len], BINCODE_CONFIG) {
let IdClientPacket(id, packet) = packet; let (packet, _): (IdClientPacket<R>, _) = packet;
match id { let IdClientPacket(id, packet) = packet;
Some(id) => { match id {
if !self.clients.contains_key(&id) { Some(id) => {
bail!("Client with id {id} doesn't exist"); if !self.clients.contains_key(&id) {
}; bail!("Client with id {id} doesn't exist");
match packet { };
ClientPacket::Data(data) => { match packet {
self.event_queue.push_back(ServerEvent::MessageReceived { ClientPacket::Data(data) => {
from: id, self.event_queue.push_back(ServerEvent::MessageReceived {
message: data, from: id,
}); message: data,
} });
ClientPacket::Disconnect => {
log::info!("Client {id} disconnected");
self.event_queue.push_back(ServerEvent::Disconnected(id));
self.disconnect_client_inner(id, "Disconnected".into())?;
},
ClientPacket::Heartbeat => {
self.clients.get_mut(&id).unwrap().timeout = Instant::now()
},
ClientPacket::Connect => bail!("Client already connected"),
}
},
None => {
match packet {
ClientPacket::Connect => {
match self.add_client(addr) {
Ok(id) => {
log::info!("Client with id {id} connected");
self.event_queue.push_back(ServerEvent::Connected(id));
self.send_to_addr(addr,
IdServerPacket(None, ServerPacket::Connected(id)
))?;
},
Err(error) => {
let reason = error.to_string();
log::error!("Client connection failed: {reason}");
self.send_to_addr(addr, IdServerPacket(
None, ServerPacket::Disconnected(reason)
))?;
}
} }
}, ClientPacket::Disconnect => {
_ => bail!("Invalid packet type for non-id packet") log::info!("Client {id} disconnected");
self.event_queue.push_back(ServerEvent::Disconnected(id));
self.disconnect_client_inner(id, "Disconnected".into())?;
},
ClientPacket::Heartbeat => {
self.clients.get_mut(&id).unwrap().timeout = Instant::now()
},
ClientPacket::Connect => bail!("Client already connected"),
}
},
None => {
match packet {
ClientPacket::Connect => {
match self.add_client(addr) {
Ok(id) => {
log::info!("Client with id {id} connected");
self.event_queue.push_back(ServerEvent::Connected(id));
self.send_to_addr(addr,
IdServerPacket(None, ServerPacket::Connected(id)
))?;
},
Err(error) => {
let reason = error.to_string();
log::error!("Client connection failed: {reason}");
self.send_to_addr(addr, IdServerPacket(
None, ServerPacket::Disconnected(reason)
))?;
}
}
},
_ => bail!("Invalid packet type for non-id packet")
}
} }
} }
} else {
bail!("Corrupted packet received");
} }
} else { },
bail!("Corrupted packet received"); Err(error) if error.kind() != ErrorKind::WouldBlock => {
} return Err(error.into());
}, },
Err(error) if error.kind() != ErrorKind::WouldBlock => { _ => break,
return Err(error.into()); }
},
_ => (),
} }
Ok(()) Ok(())
} }
pub fn get_event(&mut self) -> Option<ServerEvent<R>> { pub fn pop_event(&mut self) -> Option<ServerEvent<R>> {
self.event_queue.pop_front() self.event_queue.pop_front()
} }
pub fn process_events(&mut self) -> DrainDeque<ServerEvent<R>> { pub fn process_events(&mut self) -> DrainDeque<ServerEvent<R>> {

View file

@ -63,16 +63,19 @@ use rendering::{
RenderTarget, RenderTarget,
BackgroundColor, BackgroundColor,
clear_background, clear_background,
init_window_size,
update_window_size,
primitives::init_primitives, primitives::init_primitives,
selection_box::render_selection_box, selection_box::render_selection_box,
world::draw_world, world::draw_world,
world::draw_current_chunk_border, init_window_size, update_window_size, world::draw_current_chunk_border,
}; };
use block_placement::block_placement_system; use block_placement::block_placement_system;
use delta_time::{DeltaTime, init_delta_time}; use delta_time::{DeltaTime, init_delta_time};
use cursor_lock::{insert_lock_state, update_cursor_lock_state, lock_cursor_now}; use cursor_lock::{insert_lock_state, update_cursor_lock_state, lock_cursor_now};
use control_flow::{exit_on_esc, insert_control_flow_unique, SetControlFlow}; use control_flow::{exit_on_esc, insert_control_flow_unique, SetControlFlow};
use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state}; use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state};
use networking::{update_networking, is_multiplayer};
use init::initialize_from_args; use init::initialize_from_args;
use gui::{render_gui, init_gui, update_gui}; use gui::{render_gui, init_gui, update_gui};
use loading_screen::update_loading_screen; use loading_screen::update_loading_screen;
@ -102,6 +105,9 @@ fn update() -> Workload {
update_cursor_lock_state, update_cursor_lock_state,
process_inputs, process_inputs,
exit_on_esc, exit_on_esc,
(
update_networking
).into_workload().run_if(is_multiplayer),
( (
update_loading_screen, update_loading_screen,
).into_workload().run_if(is_loading), ).into_workload().run_if(is_loading),
@ -114,8 +120,8 @@ fn update() -> Workload {
update_raycasts, update_raycasts,
block_placement_system, block_placement_system,
apply_queued_blocks, apply_queued_blocks,
compute_cameras,
).into_workload().run_if(is_ingame), ).into_workload().run_if(is_ingame),
compute_cameras,
update_gui, update_gui,
update_state, update_state,
).into_workload() ).into_workload()

View file

@ -1,11 +1,13 @@
use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, WorkloadModificator}; use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, WorkloadModificator, EntitiesView, EntitiesViewMut, Component, ViewMut, SystemModificator};
use std::net::SocketAddr; use std::net::SocketAddr;
use kubi_udp::client::{Client, ClientConfig}; use kubi_udp::client::{Client, ClientConfig, ClientEvent};
use kubi_shared::networking::{ use kubi_shared::networking::{
messages::{ClientToServerMessage, ServerToClientMessage}, messages::{ClientToServerMessage, ServerToClientMessage},
state::ClientJoinState state::ClientJoinState
}; };
use crate::events::EventComponent;
#[derive(Unique, Clone, Copy, PartialEq, Eq)] #[derive(Unique, Clone, Copy, PartialEq, Eq)]
pub enum GameType { pub enum GameType {
Singleplayer, Singleplayer,
@ -18,6 +20,9 @@ pub struct ServerAddress(pub SocketAddr);
#[derive(Unique)] #[derive(Unique)]
pub struct UdpClient(pub Client<ClientToServerMessage, ServerToClientMessage>); pub struct UdpClient(pub Client<ClientToServerMessage, ServerToClientMessage>);
#[derive(Component)]
pub struct NetworkEvent(pub ClientEvent<ServerToClientMessage>);
pub fn create_client( pub fn create_client(
storages: AllStoragesView storages: AllStoragesView
) { ) {
@ -29,34 +34,47 @@ pub fn create_client(
storages.add_unique(ClientJoinState::Disconnected); storages.add_unique(ClientJoinState::Disconnected);
} }
pub fn client_connect( pub fn connect_client(
mut client: UniqueViewMut<UdpClient> mut client: UniqueViewMut<UdpClient>
) { ) {
client.0.connect().unwrap(); client.0.connect().unwrap();
} }
pub fn update_client_and_get_events( pub fn update_client(
mut client: UniqueViewMut<UdpClient>, mut client: UniqueViewMut<UdpClient>,
) { ) {
client.0.update().unwrap(); client.0.update().unwrap();
for event in client.0.process_events() {
todo!()
}
} }
pub fn init_networking() -> Workload { pub fn insert_client_events(
( mut client: UniqueViewMut<UdpClient>,
create_client, mut entities: EntitiesViewMut,
client_connect, mut events: ViewMut<EventComponent>,
).into_workload().run_if(is_multiplayer) mut network_events: ViewMut<NetworkEvent>,
) {
entities.bulk_add_entity((
&mut events,
&mut network_events,
), client.0.process_events().map(|event| {
(EventComponent, NetworkEvent(event))
}));
} }
pub fn update_networking() -> Workload { pub fn update_networking() -> Workload {
( (
update_client_and_get_events, create_client.run_if_missing_unique::<UdpClient>(),
connect_client.run_if(client_needs_connect_call),
update_client,
insert_client_events,
).into_workload().run_if(is_multiplayer) ).into_workload().run_if(is_multiplayer)
} }
fn client_needs_connect_call(
client: UniqueView<UdpClient>,
) -> bool {
client.0.has_not_made_connection_attempts()
}
pub fn is_multiplayer( pub fn is_multiplayer(
game_type: UniqueView<GameType> game_type: UniqueView<GameType>
) -> bool { ) -> bool {