initial impl

This commit is contained in:
griffi-gh 2024-09-01 23:37:49 +02:00
parent be1e24ba0c
commit 2466c02937
9 changed files with 207 additions and 15 deletions

5
.gitignore vendored
View file

@ -1,3 +1,5 @@
.direnv
# Generated by Cargo
# will have compiled files and executables
debug/
@ -15,6 +17,7 @@ _src
_visualizer.json
*.kubi
*.kbi
/*_log*.txt
/*.log
@ -37,4 +40,4 @@ _visualizer.json
/mods
.direnv

2
Cargo.lock generated
View file

@ -1307,8 +1307,10 @@ dependencies = [
"bincode",
"bytemuck",
"fastnoise-lite",
"flume",
"glam",
"hashbrown 0.14.5",
"log",
"nohash-hasher",
"num_enum",
"nz",

View file

@ -14,6 +14,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv
serde_with = "3.4"
bincode = "1.3"
anyhow = "1.0"
flume = "0.11"
fastnoise-lite = { version = "1.1", features = ["std", "f64"] }
rand = { version = "0.8", default_features = false, features = ["std", "min_const_gen"] }
rand_xoshiro = "0.6"
@ -23,6 +24,7 @@ bytemuck = { version = "1.14", features = ["derive"] }
static_assertions = "1.1"
nz = "0.4"
atomic = "0.6"
log = "0.4"
[features]
default = []

View file

@ -17,6 +17,8 @@ use crate::{
chunk::{CHUNK_SIZE, BlockDataRef, BlockData}
};
pub mod io_thread;
const SECTOR_SIZE: usize = CHUNK_SIZE * CHUNK_SIZE * CHUNK_SIZE * size_of::<Block>();
const RESERVED_SIZE: usize = 1048576; //~1mb (16 sectors assuming 32x32x32 world of 1byte blocks)
const RESERVED_SECTOR_COUNT: usize = RESERVED_SIZE / SECTOR_SIZE;

View file

@ -0,0 +1,166 @@
use glam::IVec3;
use flume::{Receiver, Sender, TryIter};
use shipyard::Unique;
use crate::chunk::BlockData;
use super::WorldSaveFile;
pub enum IOCommand {
SaveChunk {
position: IVec3,
data: BlockData,
},
/// Load a chunk from the disk and send it to the main thread
LoadChunk {
position: IVec3,
},
/// Process all pending write commands and make the thread end itself
/// LoadChunk commands will be ignored after this command is received
Kys,
}
pub enum IOResponse {
/// A chunk has been loaded from the disk
/// Or not, in which case the data will be None
/// and chunk should be generated
ChunkLoaded {
position: IVec3,
data: Option<BlockData>,
},
/// The IO thread has been terminated
Terminated,
}
struct IOThreadContext {
tx: Sender<IOResponse>,
rx: Receiver<IOCommand>,
save: WorldSaveFile,
}
//TODO: Implement proper error handling (I/O errors are rlly common)
impl IOThreadContext {
/// Should be called ON the IO thread
///
/// Initializes the IO thread context, opening the file at the given path
/// If there's an error opening the file, the thread will panic (TODO: handle this more gracefully)
pub fn initialize(
tx: Sender<IOResponse>,
rx: Receiver<IOCommand>,
save: WorldSaveFile,
) -> Self {
// save.load_data().unwrap();
Self { tx, rx, save }
}
pub fn run(mut self) {
loop {
match self.rx.recv().unwrap() {
IOCommand::SaveChunk { position, data } => {
self.save.save_chunk(position, &data).unwrap();
}
IOCommand::LoadChunk { position } => {
let data = self.save.load_chunk(position).unwrap();
self.tx.send(IOResponse::ChunkLoaded { position, data }).unwrap();
}
IOCommand::Kys => {
// Process all pending write commands
while let IOCommand::SaveChunk { position, data } = self.rx.recv().unwrap() {
self.save.save_chunk(position, &data).unwrap();
}
self.tx.send(IOResponse::Terminated).unwrap();
return;
}
}
}
}
}
pub struct IOSingleThread {
tx: Sender<IOCommand>,
rx: Receiver<IOResponse>,
handle: std::thread::JoinHandle<()>,
}
impl IOSingleThread {
pub fn spawn(save: WorldSaveFile) -> Self {
// Create channels
let (command_tx, command_rx) = flume::unbounded();
let (response_tx, response_rx) = flume::unbounded();
// Spawn the thread
let builder = std::thread::Builder::new()
.name("io-thread".into());
let handle = builder.spawn(move || {
let context = IOThreadContext::initialize(response_tx, command_rx, save);
context.run();
}).unwrap();
IOSingleThread {
tx: command_tx,
rx: response_rx,
handle
}
}
/// Send a command to the IO thread
pub fn send(&self, cmd: IOCommand) {
self.tx.send(cmd).unwrap();
}
/// Poll the IO thread for responses (non-blocking)
pub fn poll(&self) -> TryIter<IOResponse> {
self.rx.try_iter()
}
/// Signal the IO thread to process the remaining requests and wait for it to terminate
pub fn stop_sync(&self) {
// Tell the thread to terminate and wait for it to finish
self.tx.send(IOCommand::Kys).unwrap();
while !matches!(self.rx.recv().unwrap(), IOResponse::Terminated) {}
// HACK "we have .join at home"
while !self.handle.is_finished() {}
}
/// Same as stop_sync but doesn't wait for the IO thread to terminate
pub fn stop_async(&self) {
self.tx.send(IOCommand::Kys).unwrap();
}
}
impl Drop for IOSingleThread {
fn drop(&mut self) {
self.stop_sync();
}
}
/// This is a stub for future implemntation that may use multiple IO threads
#[derive(Unique)]
pub struct IOThreadManager {
thread: IOSingleThread,
}
impl IOThreadManager {
pub fn new(save: WorldSaveFile) -> Self {
Self {
thread: IOSingleThread::spawn(save)
}
}
pub fn send(&self, cmd: IOCommand) {
self.thread.send(cmd);
}
pub fn poll(&self) -> TryIter<IOResponse> {
self.thread.poll()
}
}
// i think im a girl :3 (noone will ever read this right? :p)

View file

@ -5,14 +5,15 @@ use crate::{
networking::{GameType, ServerAddress},
state::{GameState, NextState}
};
use kubi_shared::data::WorldSaveFile;
use kubi_shared::data::{io_thread::IOThreadManager, WorldSaveFile};
fn open_local_save_file(path: &Path) -> Result<WorldSaveFile> {
let mut save_file = WorldSaveFile::new({
OpenOptions::new()
.read(true)
.write(true)
.open("world.kbi")?
.create(true)
.open(path)?
});
if save_file.file.metadata().unwrap().len() == 0 {
save_file.initialize()?;
@ -25,13 +26,20 @@ fn open_local_save_file(path: &Path) -> Result<WorldSaveFile> {
pub fn initialize_from_args(
all_storages: AllStoragesView,
) {
// If an address is provided, we're in multiplayer mode (the first argument is the address)
// Otherwise, we're in singleplayer mode and working with local stuff
let args: Vec<String> = env::args().collect();
if args.len() > 1 {
// Parse the address and switch the state to connecting
let address = args[1].parse::<SocketAddr>().expect("invalid address");
all_storages.add_unique(GameType::Muliplayer);
all_storages.add_unique(ServerAddress(address));
all_storages.borrow::<UniqueViewMut<NextState>>().unwrap().0 = Some(GameState::Connecting);
} else {
// Open the local save file
let save_file = open_local_save_file(Path::new("./world.kubi")).expect("failed to open save file");
all_storages.add_unique(IOThreadManager::new(save_file));
// Switch the state and kick off the world loading
all_storages.add_unique(GameType::Singleplayer);
all_storages.borrow::<UniqueViewMut<NextState>>().unwrap().0 = Some(GameState::LoadingWorld);
}

View file

@ -37,7 +37,7 @@ pub fn inject_network_responses_into_manager_queue(
let ServerToClientMessage::ChunkResponse {
chunk, data, queued
} = packet else { unreachable!() };
manager.add_sussy_response(ChunkTaskResponse::LoadedChunk {
manager.add_sussy_response(ChunkTaskResponse::ChunkWorldgenDone {
position: chunk,
chunk_data: data,
queued

View file

@ -8,7 +8,7 @@ use wgpu::util::DeviceExt;
use crate::{
networking::UdpClient,
player::MainPlayer,
rendering::{world::ChunkVertex, BufferPair, Renderer},
rendering::{BufferPair, Renderer},
settings::GameSettings,
state::GameState,
transform::Transform,
@ -185,7 +185,7 @@ fn process_state_changes(
);
} else {
let atomic = Arc::new(Atomic::new(AbortState::Continue));
task_manager.spawn_task(ChunkTask::LoadChunk {
task_manager.spawn_task(ChunkTask::ChunkWorldgen {
seed: 0xbeef_face_dead_cafe,
position,
abortion: Some(Arc::clone(&atomic)),
@ -273,7 +273,7 @@ fn process_completed_tasks(
let mut ops: usize = 0;
while let Some(res) = task_manager.receive() {
match res {
ChunkTaskResponse::LoadedChunk { position, chunk_data, mut queued } => {
ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, mut queued } => {
//If unwanted chunk is already loaded
//It would be ~~...unethical~~ impossible to abort the operation at this point
//Instead, we'll just throw it away
@ -308,7 +308,7 @@ fn process_completed_tasks(
//increase ops counter
ops += 1;
},
ChunkTaskResponse::GeneratedMesh {
ChunkTaskResponse::GenerateMeshDone {
position,
vertices, indices,
trans_vertices, trans_indices,

View file

@ -13,7 +13,7 @@ use super::{
use crate::rendering::world::ChunkVertex;
pub enum ChunkTask {
LoadChunk {
ChunkWorldgen {
seed: u64,
position: IVec3,
abortion: Option<Arc<Atomic<AbortState>>>,
@ -23,13 +23,14 @@ pub enum ChunkTask {
data: MeshGenData
}
}
pub enum ChunkTaskResponse {
LoadedChunk {
ChunkWorldgenDone {
position: IVec3,
chunk_data: BlockData,
queued: Vec<QueuedBlock>
},
GeneratedMesh {
GenerateMeshDone {
position: IVec3,
vertices: Vec<ChunkVertex>,
indices: Vec<u32>,
@ -43,6 +44,7 @@ pub struct ChunkTaskManager {
channel: (Sender<ChunkTaskResponse>, Receiver<ChunkTaskResponse>),
pool: ThreadPool,
}
impl ChunkTaskManager {
pub fn new() -> Self {
Self {
@ -50,11 +52,17 @@ impl ChunkTaskManager {
pool: ThreadPoolBuilder::new().num_threads(4).build().unwrap()
}
}
//TODO get rid of add_sussy_response
/// Add a response to the queue, to be picked up by the main thread
/// Used by the multiplayer netcode, a huge hack
pub fn add_sussy_response(&self, response: ChunkTaskResponse) {
// this WILL get stuck if the channel is bounded
// don't make the channel bounded ever
self.channel.0.send(response).unwrap()
}
pub fn spawn_task(&self, task: ChunkTask) {
let sender = self.channel.0.clone();
self.pool.spawn(move || {
@ -64,22 +72,23 @@ impl ChunkTaskManager {
(vertices, indices),
(trans_vertices, trans_indices),
) = generate_mesh(position, data);
ChunkTaskResponse::GeneratedMesh {
ChunkTaskResponse::GenerateMeshDone {
position,
vertices, indices,
trans_vertices, trans_indices,
}
},
ChunkTask::LoadChunk { position, seed, abortion } => {
ChunkTask::ChunkWorldgen { position, seed, abortion } => {
let Some((chunk_data, queued)) = generate_world(position, seed, abortion) else {
log::warn!("aborted operation");
return
};
ChunkTaskResponse::LoadedChunk { position, chunk_data, queued }
ChunkTaskResponse::ChunkWorldgenDone { position, chunk_data, queued }
}
});
});
}
pub fn receive(&self) -> Option<ChunkTaskResponse> {
self.channel.1.try_recv().ok()
}