From 6375b397e93ace72e82b1a7a7cd16b2020639832 Mon Sep 17 00:00:00 2001 From: griffi-gh Date: Sun, 12 Mar 2023 17:58:24 +0100 Subject: [PATCH] send block updates across network --- kubi-server/src/server.rs | 1 + kubi-server/src/world.rs | 60 +++++++++++++++++- kubi-shared/src/networking/channels.rs | 1 + kubi/src/main.rs | 11 ++-- kubi/src/networking.rs | 86 +++++++++++++++++++++++--- 5 files changed, 141 insertions(+), 18 deletions(-) diff --git a/kubi-server/src/server.rs b/kubi-server/src/server.rs index 8b84264..22b37a4 100644 --- a/kubi-server/src/server.rs +++ b/kubi-server/src/server.rs @@ -47,6 +47,7 @@ pub fn update_server( mut server: NonSendSync>, mut events: UniqueViewMut, ) { + server.0.flush(); events.0.clear(); events.0.extend(server.0.step()); } diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index de7e522..fa34ac2 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -1,9 +1,9 @@ -use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync}; +use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter}; use glam::IVec3; use hashbrown::HashMap; use kubi_shared::networking::{ - messages::{ClientToServerMessage, ServerToClientMessage, C_CHUNK_SUB_REQUEST}, - channels::CHANNEL_WORLD, + messages::{ClientToServerMessage, ServerToClientMessage, C_CHUNK_SUB_REQUEST, C_QUEUE_BLOCK}, + channels::{CHANNEL_WORLD, CHANNEL_BLOCK}, client::Client, }; use uflow::{ @@ -163,6 +163,59 @@ fn process_finished_tasks( } } +fn process_block_queue_messages( + server: NonSendSync>, + events: UniqueView, + addr_map: UniqueView, + clients: View, + addrs: View, +) { + for event in &events.0 { + let ServerEvent::Receive(client_addr, data) = event else{ + continue + }; + if !event.is_message_of_type::() { + continue + } + let Some(&entity_id) = addr_map.0.get(client_addr) else { + log::error!("Client not authenticated"); + continue + }; + let Ok(&Client(client_id)) = (&clients).get(entity_id) else { + log::error!("Entity ID is invalid"); + continue + }; + let Ok(parsed_message) = postcard::from_bytes(data) else { + log::error!("Malformed message"); + continue + }; + let ClientToServerMessage::QueueBlock { item } = parsed_message else { + unreachable!() + }; + //TODO place in our own queue, for now just send to other clients + log::info!("Placed block {:?} at {}", item.block_type, item.position); + for (other_client, other_client_address) in (&clients, &addrs).iter() { + //No need to send the event back + if client_id == other_client.0 { + continue + } + //Get client + let Some(client) = server.0.client(&other_client_address.0) else { + log::error!("Client with address not found"); + continue + }; + //Send the message + client.borrow_mut().send( + postcard::to_allocvec( + &ServerToClientMessage::QueueBlock { item } + ).unwrap().into_boxed_slice(), + CHANNEL_BLOCK, + SendMode::Reliable, + ); + } + } +} + fn init_chunk_manager( storages: AllStoragesView ) { @@ -180,5 +233,6 @@ pub fn update_world() -> Workload { ( process_chunk_requests, process_finished_tasks, + process_block_queue_messages, ).into_workload() } diff --git a/kubi-shared/src/networking/channels.rs b/kubi-shared/src/networking/channels.rs index 62182af..170fa68 100644 --- a/kubi-shared/src/networking/channels.rs +++ b/kubi-shared/src/networking/channels.rs @@ -1,3 +1,4 @@ pub const CHANNEL_GENERIC: usize = 0; pub const CHANNEL_AUTH: usize = 1; pub const CHANNEL_WORLD: usize = 2; +pub const CHANNEL_BLOCK: usize = 3; diff --git a/kubi/src/main.rs b/kubi/src/main.rs index 178e974..5e3ea5b 100644 --- a/kubi/src/main.rs +++ b/kubi/src/main.rs @@ -47,7 +47,7 @@ use world::{ loading::update_loaded_world_around_player, raycast::update_raycasts, queue::apply_queued_blocks, - tasks::{ChunkTaskManager}, ChunkStorage + tasks::{ChunkTaskManager}, }; use player::{spawn_player, MainPlayer}; use prefabs::load_prefabs; @@ -78,7 +78,7 @@ use delta_time::{DeltaTime, init_delta_time}; use cursor_lock::{insert_lock_state, update_cursor_lock_state, lock_cursor_now}; use control_flow::{exit_on_esc, insert_control_flow_unique, SetControlFlow}; use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state, is_connecting}; -use networking::{update_networking, is_multiplayer, disconnect_on_exit}; +use networking::{update_networking, update_networking_late, is_multiplayer, disconnect_on_exit}; use init::initialize_from_args; use gui::{render_gui, init_gui, update_gui}; use loading_screen::update_loading_screen; @@ -109,10 +109,8 @@ fn update() -> Workload { ( init_game_world.run_if_missing_unique::(), spawn_player.run_if_storage_empty::(), - ).into_sequential_workload().run_if(is_ingame_or_loading).tag("game_init"), - ( - update_networking, - ).into_sequential_workload().run_if(is_multiplayer).tag("networking"), + ).into_sequential_workload().run_if(is_ingame_or_loading), + update_networking.run_if(is_multiplayer), ( switch_to_loading_if_connected ).into_sequential_workload().run_if(is_connecting), @@ -129,6 +127,7 @@ fn update() -> Workload { update_block_placement, apply_queued_blocks, ).into_sequential_workload().run_if(is_ingame), + update_networking_late.run_if(is_multiplayer), compute_cameras, update_gui, update_state, diff --git a/kubi/src/networking.rs b/kubi/src/networking.rs index 7914039..0ef3ee5 100644 --- a/kubi/src/networking.rs +++ b/kubi/src/networking.rs @@ -1,15 +1,23 @@ use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, EntitiesViewMut, Component, ViewMut, SystemModificator, View, IntoIter, WorkloadModificator}; use glium::glutin::event_loop::ControlFlow; use std::net::SocketAddr; -use uflow::client::{Client, Config as ClientConfig, Event as ClientEvent}; +use uflow::{client::{Client, Config as ClientConfig, Event as ClientEvent}, SendMode}; use lz4_flex::decompress_size_prepended; use anyhow::{Result, Context}; -use kubi_shared::networking::{ - messages::{ClientToServerMessage, ServerToClientMessage, S_SERVER_HELLO, S_CHUNK_RESPONSE}, - state::ClientJoinState, - channels::CHANNEL_AUTH, +use kubi_shared::{ + networking::{ + messages::{ClientToServerMessage, ServerToClientMessage, S_SERVER_HELLO, S_CHUNK_RESPONSE, S_QUEUE_BLOCK}, + state::ClientJoinState, + channels::{CHANNEL_AUTH, CHANNEL_BLOCK}, + }, + queue::QueuedBlock +}; +use crate::{ + events::{EventComponent, player_actions::PlayerActionEvent}, + control_flow::SetControlFlow, + world::{tasks::{ChunkTaskResponse, ChunkTaskManager}, queue::BlockUpdateQueue}, + state::is_ingame_or_loading }; -use crate::{events::EventComponent, control_flow::SetControlFlow, world::tasks::{ChunkTaskResponse, ChunkTaskManager}, state::is_ingame_or_loading}; #[derive(Unique, Clone, Copy, PartialEq, Eq)] pub enum GameType { @@ -62,6 +70,12 @@ fn poll_client( })); } +fn flush_client( + mut client: UniqueViewMut, +) { + client.0.flush(); +} + fn set_client_join_state_to_connected( mut join_state: UniqueViewMut ) { @@ -138,6 +152,50 @@ fn inject_network_responses_into_manager_queue( } } +fn send_block_place_events( + action_events: View, + mut client: UniqueViewMut, +) { + for event in action_events.iter() { + let PlayerActionEvent::UpdatedBlock { position, block } = event else { + continue + }; + client.0.send( + postcard::to_allocvec(&ClientToServerMessage::QueueBlock { + item: QueuedBlock { + position: *position, + block_type: *block, + soft: false + } + }).unwrap().into_boxed_slice(), + CHANNEL_BLOCK, + SendMode::Reliable, + ); + } +} + +fn recv_block_place_events( + mut queue: UniqueViewMut, + network_events: View, +) { + for event in network_events.iter() { + let ClientEvent::Receive(data) = &event.0 else { + continue + }; + if !event.is_message_of_type::() { + continue + } + let Ok(parsed_message) = postcard::from_bytes(data) else { + log::error!("Malformed message"); + continue + }; + let ServerToClientMessage::QueueBlock { item } = parsed_message else { + unreachable!() + }; + queue.push(item); + } +} + pub fn update_networking() -> Workload { ( connect_client.run_if_missing_unique::(), @@ -147,10 +205,20 @@ pub fn update_networking() -> Workload { say_hello, ).into_sequential_workload().run_if(if_just_connected), ( - check_server_hello_response, - ).into_sequential_workload().run_if(is_join_state::<{ClientJoinState::Connected as u8}>), + check_server_hello_response + ).run_if(is_join_state::<{ClientJoinState::Connected as u8}>), + ( + recv_block_place_events + ).run_if(is_join_state::<{ClientJoinState::Joined as u8}>).run_if(is_ingame_or_loading), inject_network_responses_into_manager_queue.run_if(is_ingame_or_loading).skip_if_missing_unique::(), - ).into_sequential_workload() //HACK Weird issues with shipyard removed + ).into_sequential_workload() +} + +pub fn update_networking_late() -> Workload { + ( + send_block_place_events.run_if(is_join_state::<{ClientJoinState::Joined as u8}>), + flush_client, + ).into_sequential_workload() } pub fn disconnect_on_exit(