world compression (server only for now)

This commit is contained in:
griffi-gh 2023-03-09 04:06:46 +01:00
parent 5a75e44beb
commit f6eef9457b
5 changed files with 52 additions and 20 deletions

8
Cargo.lock generated
View file

@ -939,6 +939,7 @@ dependencies = [
"kubi-logging",
"kubi-shared",
"log",
"lz4_flex",
"nohash-hasher",
"postcard",
"rayon",
@ -968,6 +969,7 @@ dependencies = [
"kubi-logging",
"kubi-shared",
"log",
"lz4_flex",
"nohash-hasher",
"postcard",
"rand",
@ -1056,6 +1058,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "lz4_flex"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83"
[[package]]
name = "mach"
version = "0.3.2"

View file

@ -20,6 +20,7 @@ flume = "0.10"
rand = "0.8"
uflow = "0.7"
postcard = { version = "1.0", features = ["alloc"] }
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"] }
[features]
default = []

View file

@ -7,9 +7,12 @@ use kubi_shared::networking::{
client::Client,
};
use uflow::{
server::Event as ServerEvent,
server::{Event as ServerEvent, RemoteClient},
SendMode
};
use lz4_flex::compress_prepend_size as lz4_compress;
use anyhow::Result;
use std::{rc::Rc, cell::RefCell};
use crate::{
server::{UdpServer, ServerEvents, IsMessageOfType},
config::ConfigTable,
@ -33,8 +36,23 @@ impl ChunkManager {
}
}
///Sends a compressed chunk packet
pub fn send_chunk_compressed(
client: &Rc<RefCell<RemoteClient>>,
message: &ServerToClientMessage
) -> Result<()> {
let mut ser_message = postcard::to_allocvec(&message)?;
let (_, data) = ser_message.split_at_mut(1);
let mut compressed = lz4_compress(&data);
ser_message.truncate(1);
ser_message.append(&mut compressed);
let ser_message = ser_message.into_boxed_slice();
client.borrow_mut().send(ser_message, CHANNEL_WORLD, SendMode::Reliable);
Ok(())
}
fn process_chunk_requests(
mut server: NonSendSync<UniqueViewMut<UdpServer>>,
server: NonSendSync<UniqueView<UdpServer>>,
events: UniqueView<ServerEvents>,
mut chunk_manager: UniqueViewMut<ChunkManager>,
task_manager: UniqueView<ChunkTaskManager>,
@ -73,15 +91,14 @@ fn process_chunk_requests(
chunk.subscriptions.insert(client_id);
//TODO Start task here if status is "Nothing"
if let Some(blocks) = &chunk.blocks {
client.borrow_mut().send(
postcard::to_allocvec(&ServerToClientMessage::ChunkResponse {
send_chunk_compressed(
&client,
&ServerToClientMessage::ChunkResponse {
chunk: chunk_position,
data: blocks.clone(),
queued: Vec::with_capacity(0)
}).unwrap().into_boxed_slice(),
CHANNEL_WORLD,
SendMode::Reliable,
);
}
).unwrap();
}
} else {
let mut chunk = Chunk::new(chunk_position);
@ -97,13 +114,12 @@ fn process_chunk_requests(
}
fn process_finished_tasks(
mut server: NonSendSync<UniqueViewMut<UdpServer>>,
server: NonSendSync<UniqueView<UdpServer>>,
task_manager: UniqueView<ChunkTaskManager>,
mut chunk_manager: UniqueViewMut<ChunkManager>,
id_map: UniqueView<ClientIdMap>,
client_addr: View<ClientAddress>,
) {
let mut limit: usize = 8;
'outer: while let Some(res) = task_manager.receive() {
let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res;
let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else {
@ -116,7 +132,15 @@ fn process_finished_tasks(
}
chunk.state = ChunkState::Loaded;
chunk.blocks = Some(blocks.clone());
log::debug!("Chunk {chunk_position} loaded, {} subs", chunk.subscriptions.len());
let chunk_packet = &ServerToClientMessage::ChunkResponse {
chunk: chunk_position,
data: blocks,
queued: queue
};
for &subscriber in &chunk.subscriptions {
let Some(&entity_id) = id_map.0.get(&subscriber) else {
log::error!("Invalid subscriber client id");
@ -130,15 +154,12 @@ fn process_finished_tasks(
log::error!("Client not connected");
continue 'outer;
};
client.borrow_mut().send(
postcard::to_allocvec(&ServerToClientMessage::ChunkResponse {
chunk: chunk_position,
data: blocks.clone(),
queued: queue.clone()
}).unwrap().into_boxed_slice(),
CHANNEL_WORLD,
SendMode::Reliable,
);
send_chunk_compressed(client, chunk_packet).unwrap();
// client.borrow_mut().send(
// chunk_packet.clone(),
// CHANNEL_WORLD,
// SendMode::Reliable,
// );
}
}
}

View file

@ -3,7 +3,7 @@ use serde::{Serialize, Deserialize};
use crate::{chunk::BlockData, queue::QueuedBlock};
use super::client::ClientId;
pub const PROTOCOL_ID: u16 = 2;
pub const PROTOCOL_ID: u16 = 3;
pub const C_CLIENT_HELLO: u8 = 0;
pub const C_POSITION_CHANGED: u8 = 1;
@ -45,6 +45,7 @@ pub enum ServerToClientMessage {
position: Vec3,
direction: Quat,
} = S_PLAYER_POSITION_CHANGED,
///WARNING: THIS IS COMPRESSED
ChunkResponse {
chunk: IVec3,
data: BlockData,

View file

@ -22,6 +22,7 @@ gilrs = { version = "0.10", default_features = false, features = ["xinput"] }
uflow = "0.7"
postcard = { version = "1.0", features = ["alloc"] }
serde_json = { version = "1.0", optional = true }
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"] }
[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "0.3" }