diff --git a/Cargo.lock b/Cargo.lock index 74b146e..d95aeb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "kubi-logging", "kubi-shared", "log", + "lz4_flex", "nohash-hasher", "postcard", "rayon", @@ -968,6 +969,7 @@ dependencies = [ "kubi-logging", "kubi-shared", "log", + "lz4_flex", "nohash-hasher", "postcard", "rand", @@ -1056,6 +1058,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4_flex" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83" + [[package]] name = "mach" version = "0.3.2" diff --git a/kubi-server/Cargo.toml b/kubi-server/Cargo.toml index 6fb5e3d..d903b93 100644 --- a/kubi-server/Cargo.toml +++ b/kubi-server/Cargo.toml @@ -20,6 +20,7 @@ flume = "0.10" rand = "0.8" uflow = "0.7" postcard = { version = "1.0", features = ["alloc"] } +lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"] } [features] default = [] diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs index d261b4e..faad774 100644 --- a/kubi-server/src/world.rs +++ b/kubi-server/src/world.rs @@ -7,9 +7,12 @@ use kubi_shared::networking::{ client::Client, }; use uflow::{ - server::Event as ServerEvent, + server::{Event as ServerEvent, RemoteClient}, SendMode }; +use lz4_flex::compress_prepend_size as lz4_compress; +use anyhow::Result; +use std::{rc::Rc, cell::RefCell}; use crate::{ server::{UdpServer, ServerEvents, IsMessageOfType}, config::ConfigTable, @@ -33,8 +36,23 @@ impl ChunkManager { } } +///Sends a compressed chunk packet +pub fn send_chunk_compressed( + client: &Rc>, + message: &ServerToClientMessage +) -> Result<()> { + let mut ser_message = postcard::to_allocvec(&message)?; + let (_, data) = ser_message.split_at_mut(1); + let mut compressed = lz4_compress(&data); + ser_message.truncate(1); + ser_message.append(&mut compressed); + let ser_message = ser_message.into_boxed_slice(); + client.borrow_mut().send(ser_message, CHANNEL_WORLD, SendMode::Reliable); + Ok(()) +} + fn process_chunk_requests( - mut server: NonSendSync>, + server: NonSendSync>, events: UniqueView, mut chunk_manager: UniqueViewMut, task_manager: UniqueView, @@ -73,15 +91,14 @@ fn process_chunk_requests( chunk.subscriptions.insert(client_id); //TODO Start task here if status is "Nothing" if let Some(blocks) = &chunk.blocks { - client.borrow_mut().send( - postcard::to_allocvec(&ServerToClientMessage::ChunkResponse { + send_chunk_compressed( + &client, + &ServerToClientMessage::ChunkResponse { chunk: chunk_position, data: blocks.clone(), queued: Vec::with_capacity(0) - }).unwrap().into_boxed_slice(), - CHANNEL_WORLD, - SendMode::Reliable, - ); + } + ).unwrap(); } } else { let mut chunk = Chunk::new(chunk_position); @@ -97,13 +114,12 @@ fn process_chunk_requests( } fn process_finished_tasks( - mut server: NonSendSync>, + server: NonSendSync>, task_manager: UniqueView, mut chunk_manager: UniqueViewMut, id_map: UniqueView, client_addr: View, ) { - let mut limit: usize = 8; 'outer: while let Some(res) = task_manager.receive() { let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { @@ -116,7 +132,15 @@ fn process_finished_tasks( } chunk.state = ChunkState::Loaded; chunk.blocks = Some(blocks.clone()); + log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len()); + + let chunk_packet = &ServerToClientMessage::ChunkResponse { + chunk: chunk_position, + data: blocks, + queued: queue + }; + for &subscriber in &chunk.subscriptions { let Some(&entity_id) = id_map.0.get(&subscriber) else { log::error!("Invalid subscriber client id"); @@ -130,15 +154,12 @@ fn process_finished_tasks( log::error!("Client not connected"); continue 'outer; }; - client.borrow_mut().send( - postcard::to_allocvec(&ServerToClientMessage::ChunkResponse { - chunk: chunk_position, - data: blocks.clone(), - queued: queue.clone() - }).unwrap().into_boxed_slice(), - CHANNEL_WORLD, - SendMode::Reliable, - ); + send_chunk_compressed(client, chunk_packet).unwrap(); + // client.borrow_mut().send( + // chunk_packet.clone(), + // CHANNEL_WORLD, + // SendMode::Reliable, + // ); } } } diff --git a/kubi-shared/src/networking/messages.rs b/kubi-shared/src/networking/messages.rs index 35dbbea..58df9f2 100644 --- a/kubi-shared/src/networking/messages.rs +++ b/kubi-shared/src/networking/messages.rs @@ -3,7 +3,7 @@ use serde::{Serialize, Deserialize}; use crate::{chunk::BlockData, queue::QueuedBlock}; use super::client::ClientId; -pub const PROTOCOL_ID: u16 = 2; +pub const PROTOCOL_ID: u16 = 3; pub const C_CLIENT_HELLO: u8 = 0; pub const C_POSITION_CHANGED: u8 = 1; @@ -45,6 +45,7 @@ pub enum ServerToClientMessage { position: Vec3, direction: Quat, } = S_PLAYER_POSITION_CHANGED, + ///WARNING: THIS IS COMPRESSED ChunkResponse { chunk: IVec3, data: BlockData, diff --git a/kubi/Cargo.toml b/kubi/Cargo.toml index b761204..8f2eb1d 100644 --- a/kubi/Cargo.toml +++ b/kubi/Cargo.toml @@ -22,6 +22,7 @@ gilrs = { version = "0.10", default_features = false, features = ["xinput"] } uflow = "0.7" postcard = { version = "1.0", features = ["alloc"] } serde_json = { version = "1.0", optional = true } +lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"] } [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3" }