diff --git a/Cargo.lock b/Cargo.lock index ce9e901..8245711 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -928,6 +928,7 @@ name = "kubi-server" version = "0.1.0" dependencies = [ "anyhow", + "flume", "glam", "hashbrown 0.13.2", "kubi-logging", @@ -935,6 +936,7 @@ dependencies = [ "kubi-udp", "log", "nohash-hasher", + "rayon", "serde", "shipyard", "toml", diff --git a/Server.toml b/Server.toml index 14ae320..69c01e8 100644 --- a/Server.toml +++ b/Server.toml @@ -2,3 +2,6 @@ address = "0.0.0.0:12345" max_clients = 254 timeout_ms = 10000 + +[world] +seed = 0xbeef_face_dead_cafe diff --git a/kubi-server/Cargo.toml b/kubi-server/Cargo.toml index 9e406c1..f3b9d0b 100644 --- a/kubi-server/Cargo.toml +++ b/kubi-server/Cargo.toml @@ -16,6 +16,8 @@ glam = { version = "0.23", features = ["debug-glam-assert", "fast-math"] } hashbrown = "0.13" nohash-hasher = "0.2.0" anyhow = "1.0" +rayon = "1.6" +flume = "0.10" [features] default = [] diff --git a/kubi-server/src/chunk.rs b/kubi-server/src/chunk.rs deleted file mode 100644 index 2396e84..0000000 --- a/kubi-server/src/chunk.rs +++ /dev/null @@ -1,19 +0,0 @@ -use glam::IVec3; -use hashbrown::HashMap; -use kubi_shared::chunk::BlockData; -use shipyard::Unique; - -pub struct Chunk { - pub blocks: BlockData -} - -#[derive(Unique)] -pub struct ChunkManager { - pub chunks: HashMap -} - -pub fn server_chunk_response( - -) { - -} diff --git a/kubi-server/src/config.rs b/kubi-server/src/config.rs index 2ae114f..3a6abf1 100644 --- a/kubi-server/src/config.rs +++ b/kubi-server/src/config.rs @@ -10,9 +10,15 @@ pub struct ConfigTableServer { pub password: Option, } +#[derive(Serialize, Deserialize)] +pub struct ConfigTableWorld { + pub seed: u64, +} + #[derive(Unique, Serialize, Deserialize)] pub struct ConfigTable { - pub server: ConfigTableServer + pub server: ConfigTableServer, + pub world: ConfigTableWorld, } pub fn read_config( diff --git a/kubi-server/src/main.rs b/kubi-server/src/main.rs index 926c6b5..fc24b57 100644 --- a/kubi-server/src/main.rs +++ b/kubi-server/src/main.rs @@ -1,4 +1,3 @@ - use shipyard::{World, Workload, IntoWorkload}; use std::{thread, time::Duration}; @@ -6,17 +5,19 @@ pub(crate) mod util; pub(crate) mod config; pub(crate) mod server; pub(crate) mod client; -pub(crate) mod chunk; +pub(crate) mod world; pub(crate) mod auth; use config::read_config; use server::{bind_server, update_server, update_server_events}; use auth::authenticate_players; +use world::{update_world, init_world}; fn initialize() -> Workload { ( read_config, bind_server, + init_world, ).into_workload() } @@ -25,6 +26,7 @@ fn update() -> Workload { update_server, update_server_events, authenticate_players, + update_world, ).into_workload() } diff --git a/kubi-server/src/world.rs b/kubi-server/src/world.rs new file mode 100644 index 0000000..b555576 --- /dev/null +++ b/kubi-server/src/world.rs @@ -0,0 +1,111 @@ +use shipyard::{Unique, UniqueView, UniqueViewMut, Workload, IntoWorkload, AllStoragesView}; +use glam::IVec3; +use hashbrown::HashMap; +use kubi_shared::networking::messages::{ClientToServerMessage, ServerToClientMessage}; +use kubi_udp::server::ServerEvent; +use crate::{ + server::{UdpServer, ServerEvents}, + config::ConfigTable, + util::log_error, +}; + +pub mod chunk; +pub mod tasks; + +use chunk::Chunk; + +use self::{tasks::{ChunkTaskManager, ChunkTask, ChunkTaskResponse}, chunk::ChunkState}; + +#[derive(Unique, Default)] +pub struct ChunkManager { + pub chunks: HashMap +} +impl ChunkManager { + pub fn new() -> Self { + Self::default() + } +} + +fn process_chunk_requests( + mut server: UniqueViewMut, + events: UniqueView, + mut chunk_manager: UniqueViewMut, + task_manager: UniqueView, + config: UniqueView +) { + for event in &events.0 { + if let ServerEvent::MessageReceived { + from: client_id, + message: ClientToServerMessage::ChunkSubRequest { + chunk: chunk_position + } + } = event { + let chunk_position = IVec3::from_array(*chunk_position); + if let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) { + chunk.subscriptions.insert(*client_id); + //TODO Start task here if status is "Nothing" + if let Some(blocks) = &chunk.blocks { + server.0.send_message(*client_id, kubi_shared::networking::messages::ServerToClientMessage::ChunkResponse { + chunk: chunk_position.to_array(), + data: blocks.clone(), + queued: Vec::with_capacity(0) + }).map_err(log_error).ok(); + } + } else { + let mut chunk = Chunk::new(chunk_position); + chunk.state = ChunkState::Loading; + chunk_manager.chunks.insert(chunk_position, chunk); + task_manager.spawn_task(ChunkTask::LoadChunk { + position: chunk_position, + seed: config.world.seed, + }); + } + } + } +} + +fn process_finished_tasks( + mut server: UniqueViewMut, + task_manager: UniqueView, + mut chunk_manager: UniqueViewMut, +) { + while let Some(res) = task_manager.receive() { + let ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } = res; + let Some(chunk) = chunk_manager.chunks.get_mut(&chunk_position) else { + log::warn!("Chunk discarded: Doesn't exist"); + continue + }; + if chunk.state != ChunkState::Loading { + log::warn!("Chunk discarded: Not Loading"); + continue + } + chunk.state = ChunkState::Loaded; + chunk.blocks = Some(blocks.clone()); + for &subscriber in &chunk.subscriptions { + server.0.send_message(subscriber, ServerToClientMessage::ChunkResponse { + chunk: chunk_position.to_array(), + data: blocks.clone(), + queued: queue.iter().map(|item| (item.position.to_array(), item.block_type)).collect() + }).map_err(log_error).ok(); + } + } +} + +fn init_chunk_manager( + storages: AllStoragesView +) { + storages.add_unique(ChunkManager::new()); +} + +pub fn init_world() -> Workload { + ( + init_chunk_manager + ).into_workload() +} + +pub fn update_world() -> Workload { + ( + process_chunk_requests, + process_finished_tasks, + ).into_workload() +} diff --git a/kubi-server/src/world/chunk.rs b/kubi-server/src/world/chunk.rs new file mode 100644 index 0000000..a83b0ee --- /dev/null +++ b/kubi-server/src/world/chunk.rs @@ -0,0 +1,29 @@ +use glam::IVec3; +use hashbrown::HashSet; +use nohash_hasher::BuildNoHashHasher; +use kubi_shared::chunk::BlockData; +use kubi_udp::{ClientId, ClientIdRepr}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ChunkState { + Nothing, + Loading, + Loaded, +} + +pub struct Chunk { + pub position: IVec3, + pub state: ChunkState, + pub blocks: Option, + pub subscriptions: HashSet>, +} +impl Chunk { + pub fn new(position: IVec3) -> Self { + Self { + position, + state: ChunkState::Nothing, + blocks: None, + subscriptions: HashSet::with_hasher(BuildNoHashHasher::default()), + } + } +} diff --git a/kubi-server/src/world/tasks.rs b/kubi-server/src/world/tasks.rs new file mode 100644 index 0000000..fd2d120 --- /dev/null +++ b/kubi-server/src/world/tasks.rs @@ -0,0 +1,58 @@ +use shipyard::{Unique, AllStoragesView}; +use flume::{unbounded, Sender, Receiver}; +use glam::IVec3; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use anyhow::Result; +use kubi_shared::{ + chunk::BlockData, + worldgen::{QueuedBlock, generate_world} +}; + +pub enum ChunkTask { + LoadChunk { + position: IVec3, + seed: u64, + } +} + +pub enum ChunkTaskResponse { + ChunkLoaded { + chunk_position: IVec3, + blocks: BlockData, + queue: Vec + } +} + +#[derive(Unique)] +pub struct ChunkTaskManager { + channel: (Sender, Receiver), + pool: ThreadPool, +} +impl ChunkTaskManager { + pub fn new() -> Result { + Ok(Self { + channel: unbounded(), + pool: ThreadPoolBuilder::new().build()? + }) + } + pub fn spawn_task(&self, task: ChunkTask) { + let sender = self.channel.0.clone(); + self.pool.spawn(move || { + sender.send(match task { + ChunkTask::LoadChunk { position: chunk_position, seed } => { + let (blocks, queue) = generate_world(chunk_position, seed); + ChunkTaskResponse::ChunkLoaded { chunk_position, blocks, queue } + } + }).unwrap() + }) + } + pub fn receive(&self) -> Option { + self.channel.1.try_recv().ok() + } +} + +pub fn init_chunk_task_manager( + storages: AllStoragesView +) { + storages.add_unique(ChunkTaskManager::new().expect("ChunkTaskManager Init failed")); +} diff --git a/kubi-shared/src/networking/messages.rs b/kubi-shared/src/networking/messages.rs index 0b553c6..c8b17a9 100644 --- a/kubi-shared/src/networking/messages.rs +++ b/kubi-shared/src/networking/messages.rs @@ -1,7 +1,6 @@ use std::num::NonZeroUsize; - use bincode::{Encode, Decode}; -use crate::chunk::BlockData; +use crate::{chunk::BlockData, block::Block}; type IVec3Arr = [i32; 3]; type Vec3Arr = [f32; 3]; @@ -20,7 +19,7 @@ pub enum ClientToServerMessage { velocity: Vec3Arr, direction: QuatArr, }, - ChunkRequest { + ChunkSubRequest { chunk: IVec3Arr, }, } @@ -54,6 +53,7 @@ pub enum ServerToClientMessage { }, ChunkResponse { chunk: IVec3Arr, - data: BlockData + data: BlockData, + queued: Vec<(IVec3Arr, Block)>, } } diff --git a/kubi/src/world/loading.rs b/kubi/src/world/loading.rs index e16a72f..76d67f3 100644 --- a/kubi/src/world/loading.rs +++ b/kubi/src/world/loading.rs @@ -134,7 +134,7 @@ fn start_required_tasks( DesiredChunkState::Loaded | DesiredChunkState::Rendered if chunk.current_state == CurrentChunkState::Nothing => { //start load task if let Some(client) = &udp_client { - client.0.send_message(ClientToServerMessage::ChunkRequest { + client.0.send_message(ClientToServerMessage::ChunkSubRequest { chunk: position.to_array() }).unwrap(); } else { diff --git a/kubi/src/world/tasks.rs b/kubi/src/world/tasks.rs index 705b9a5..5d855e1 100644 --- a/kubi/src/world/tasks.rs +++ b/kubi/src/world/tasks.rs @@ -83,12 +83,15 @@ pub fn inject_network_responses_into_manager_queue( events: View ) { for event in events.iter() { - if let ClientEvent::MessageReceived(ServerToClientMessage::ChunkResponse { chunk, data }) = &event.0 { + if let ClientEvent::MessageReceived(ServerToClientMessage::ChunkResponse { chunk, data, queued }) = &event.0 { let position = IVec3::from_array(*chunk); manager.add_sussy_response(ChunkTaskResponse::LoadedChunk { position, chunk_data: data.clone(), - queued: Vec::with_capacity(0) + queued: queued.iter().map(|&(position, block_type)| QueuedBlock { + position: IVec3::from_array(position), + block_type + }).collect() }); } }