send block updates across network

This commit is contained in:
griffi-gh 2023-03-12 17:58:24 +01:00
parent e29ca7d202
commit 32155ff531
5 changed files with 141 additions and 18 deletions

View file

@ -47,6 +47,7 @@ pub fn update_server(
mut server: NonSendSync<UniqueViewMut<UdpServer>>,
mut events: UniqueViewMut<ServerEvents>,
) {
server.0.flush();
events.0.clear();
events.0.extend(server.0.step());
}

View file

@ -1,9 +1,9 @@
use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync};
use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView, View, Get, NonSendSync, IntoIter};
use glam::IVec3;
use hashbrown::HashMap;
use kubi_shared::networking::{
messages::{ClientToServerMessage, ServerToClientMessage, C_CHUNK_SUB_REQUEST},
channels::CHANNEL_WORLD,
messages::{ClientToServerMessage, ServerToClientMessage, C_CHUNK_SUB_REQUEST, C_QUEUE_BLOCK},
channels::{CHANNEL_WORLD, CHANNEL_BLOCK},
client::Client,
};
use uflow::{
@ -163,6 +163,59 @@ fn process_finished_tasks(
}
}
fn process_block_queue_messages(
server: NonSendSync<UniqueView<UdpServer>>,
events: UniqueView<ServerEvents>,
addr_map: UniqueView<ClientAddressMap>,
clients: View<Client>,
addrs: View<ClientAddress>,
) {
for event in &events.0 {
let ServerEvent::Receive(client_addr, data) = event else{
continue
};
if !event.is_message_of_type::<C_QUEUE_BLOCK>() {
continue
}
let Some(&entity_id) = addr_map.0.get(client_addr) else {
log::error!("Client not authenticated");
continue
};
let Ok(&Client(client_id)) = (&clients).get(entity_id) else {
log::error!("Entity ID is invalid");
continue
};
let Ok(parsed_message) = postcard::from_bytes(data) else {
log::error!("Malformed message");
continue
};
let ClientToServerMessage::QueueBlock { item } = parsed_message else {
unreachable!()
};
//TODO place in our own queue, for now just send to other clients
log::info!("Placed block {:?} at {}", item.block_type, item.position);
for (other_client, other_client_address) in (&clients, &addrs).iter() {
//No need to send the event back
if client_id == other_client.0 {
continue
}
//Get client
let Some(client) = server.0.client(&other_client_address.0) else {
log::error!("Client with address not found");
continue
};
//Send the message
client.borrow_mut().send(
postcard::to_allocvec(
&ServerToClientMessage::QueueBlock { item }
).unwrap().into_boxed_slice(),
CHANNEL_BLOCK,
SendMode::Reliable,
);
}
}
}
fn init_chunk_manager(
storages: AllStoragesView
) {
@ -180,5 +233,6 @@ pub fn update_world() -> Workload {
(
process_chunk_requests,
process_finished_tasks,
process_block_queue_messages,
).into_workload()
}

View file

@ -1,3 +1,4 @@
pub const CHANNEL_GENERIC: usize = 0;
pub const CHANNEL_AUTH: usize = 1;
pub const CHANNEL_WORLD: usize = 2;
pub const CHANNEL_BLOCK: usize = 3;

View file

@ -47,7 +47,7 @@ use world::{
loading::update_loaded_world_around_player,
raycast::update_raycasts,
queue::apply_queued_blocks,
tasks::{ChunkTaskManager}, ChunkStorage
tasks::{ChunkTaskManager},
};
use player::{spawn_player, MainPlayer};
use prefabs::load_prefabs;
@ -78,7 +78,7 @@ use delta_time::{DeltaTime, init_delta_time};
use cursor_lock::{insert_lock_state, update_cursor_lock_state, lock_cursor_now};
use control_flow::{exit_on_esc, insert_control_flow_unique, SetControlFlow};
use state::{is_ingame, is_ingame_or_loading, is_loading, init_state, update_state, is_connecting};
use networking::{update_networking, is_multiplayer, disconnect_on_exit};
use networking::{update_networking, update_networking_late, is_multiplayer, disconnect_on_exit};
use init::initialize_from_args;
use gui::{render_gui, init_gui, update_gui};
use loading_screen::update_loading_screen;
@ -109,10 +109,8 @@ fn update() -> Workload {
(
init_game_world.run_if_missing_unique::<ChunkTaskManager>(),
spawn_player.run_if_storage_empty::<MainPlayer>(),
).into_sequential_workload().run_if(is_ingame_or_loading).tag("game_init"),
(
update_networking,
).into_sequential_workload().run_if(is_multiplayer).tag("networking"),
).into_sequential_workload().run_if(is_ingame_or_loading),
update_networking.run_if(is_multiplayer),
(
switch_to_loading_if_connected
).into_sequential_workload().run_if(is_connecting),
@ -129,6 +127,7 @@ fn update() -> Workload {
update_block_placement,
apply_queued_blocks,
).into_sequential_workload().run_if(is_ingame),
update_networking_late.run_if(is_multiplayer),
compute_cameras,
update_gui,
update_state,

View file

@ -1,15 +1,23 @@
use shipyard::{Unique, AllStoragesView, UniqueView, UniqueViewMut, Workload, IntoWorkload, EntitiesViewMut, Component, ViewMut, SystemModificator, View, IntoIter, WorkloadModificator};
use glium::glutin::event_loop::ControlFlow;
use std::net::SocketAddr;
use uflow::client::{Client, Config as ClientConfig, Event as ClientEvent};
use uflow::{client::{Client, Config as ClientConfig, Event as ClientEvent}, SendMode};
use lz4_flex::decompress_size_prepended;
use anyhow::{Result, Context};
use kubi_shared::networking::{
messages::{ClientToServerMessage, ServerToClientMessage, S_SERVER_HELLO, S_CHUNK_RESPONSE},
state::ClientJoinState,
channels::CHANNEL_AUTH,
use kubi_shared::{
networking::{
messages::{ClientToServerMessage, ServerToClientMessage, S_SERVER_HELLO, S_CHUNK_RESPONSE, S_QUEUE_BLOCK},
state::ClientJoinState,
channels::{CHANNEL_AUTH, CHANNEL_BLOCK},
},
queue::QueuedBlock
};
use crate::{
events::{EventComponent, player_actions::PlayerActionEvent},
control_flow::SetControlFlow,
world::{tasks::{ChunkTaskResponse, ChunkTaskManager}, queue::BlockUpdateQueue},
state::is_ingame_or_loading
};
use crate::{events::EventComponent, control_flow::SetControlFlow, world::tasks::{ChunkTaskResponse, ChunkTaskManager}, state::is_ingame_or_loading};
#[derive(Unique, Clone, Copy, PartialEq, Eq)]
pub enum GameType {
@ -62,6 +70,12 @@ fn poll_client(
}));
}
fn flush_client(
mut client: UniqueViewMut<UdpClient>,
) {
client.0.flush();
}
fn set_client_join_state_to_connected(
mut join_state: UniqueViewMut<ClientJoinState>
) {
@ -138,6 +152,50 @@ fn inject_network_responses_into_manager_queue(
}
}
fn send_block_place_events(
action_events: View<PlayerActionEvent>,
mut client: UniqueViewMut<UdpClient>,
) {
for event in action_events.iter() {
let PlayerActionEvent::UpdatedBlock { position, block } = event else {
continue
};
client.0.send(
postcard::to_allocvec(&ClientToServerMessage::QueueBlock {
item: QueuedBlock {
position: *position,
block_type: *block,
soft: false
}
}).unwrap().into_boxed_slice(),
CHANNEL_BLOCK,
SendMode::Reliable,
);
}
}
fn recv_block_place_events(
mut queue: UniqueViewMut<BlockUpdateQueue>,
network_events: View<NetworkEvent>,
) {
for event in network_events.iter() {
let ClientEvent::Receive(data) = &event.0 else {
continue
};
if !event.is_message_of_type::<S_QUEUE_BLOCK>() {
continue
}
let Ok(parsed_message) = postcard::from_bytes(data) else {
log::error!("Malformed message");
continue
};
let ServerToClientMessage::QueueBlock { item } = parsed_message else {
unreachable!()
};
queue.push(item);
}
}
pub fn update_networking() -> Workload {
(
connect_client.run_if_missing_unique::<UdpClient>(),
@ -147,10 +205,20 @@ pub fn update_networking() -> Workload {
say_hello,
).into_sequential_workload().run_if(if_just_connected),
(
check_server_hello_response,
).into_sequential_workload().run_if(is_join_state::<{ClientJoinState::Connected as u8}>),
check_server_hello_response
).run_if(is_join_state::<{ClientJoinState::Connected as u8}>),
(
recv_block_place_events
).run_if(is_join_state::<{ClientJoinState::Joined as u8}>).run_if(is_ingame_or_loading),
inject_network_responses_into_manager_queue.run_if(is_ingame_or_loading).skip_if_missing_unique::<ChunkTaskManager>(),
).into_sequential_workload() //HACK Weird issues with shipyard removed
).into_sequential_workload()
}
pub fn update_networking_late() -> Workload {
(
send_block_place_events.run_if(is_join_state::<{ClientJoinState::Joined as u8}>),
flush_client,
).into_sequential_workload()
}
pub fn disconnect_on_exit(