Skip to content

Commit

Permalink
Optimize chunk loading
Browse files Browse the repository at this point in the history
- Chunk loading now is done in a more simple manner. Instead of splitting it into regions and having one file handle for all chunks in the same region, now all chunks make their own file handle for the region, the previous overhead was not worth it, since this seemed to increase performance.
- Speed up of ~15%
- Now the `.read_chunks` function does not return anything, instead it sends event to an mpsc channel.
- Dramatically decreased memory footprint.
  • Loading branch information
lukas0008 committed Aug 13, 2024
1 parent 96b86f1 commit 03f9b37
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 142 deletions.
2 changes: 1 addition & 1 deletion pumpkin-world/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ edition.workspace = true
fastnbt = { git = "https://github.com/owengage/fastnbt.git" }
fastsnbt = "0.2"
tokio.workspace = true
rayon.workspace = true
itertools = "0.13.0"
thiserror = "1.0.63"
futures = "0.3.30"
flate2 = "1.0.31"
serde = { version = "1.0.205", features = ["derive"] }
lazy_static = "1.5.0"
serde_json = "1.0.122"
rayon = "1.10.0"
3 changes: 2 additions & 1 deletion pumpkin-world/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use fastnbt::LongArray;
use crate::{world::WorldError, WORLD_HEIGHT};

pub struct ChunkData {
pub blocks: Box<[i64; 16 * 16 * WORLD_HEIGHT]>,
pub blocks: Box<[i32; 16 * 16 * WORLD_HEIGHT]>,
pub position: (i32, i32),
pub heightmaps: ChunkHeightmaps,
}
Expand Down Expand Up @@ -96,6 +96,7 @@ impl ChunkData {
&entry.name,
entry.properties.as_ref(),
)
.map(|v| v as i32)
})
.collect::<Result<Vec<_>, _>>()?;
let block_data = match block_states.data {
Expand Down
231 changes: 105 additions & 126 deletions pumpkin-world/src/world.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{io::Read, path::PathBuf, sync::Arc};
use std::{
io::{Read, Seek},
path::PathBuf,
sync::{atomic::AtomicUsize, Arc},
};

use flate2::bufread::ZlibDecoder;
use itertools::Itertools;
Expand All @@ -7,7 +11,9 @@ use thiserror::Error;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt},
sync::Mutex,
runtime::Handle,
sync::{mpsc, oneshot, Mutex},
task::spawn_blocking,
};

use crate::chunk::ChunkData;
Expand Down Expand Up @@ -50,107 +56,81 @@ impl Level {
Level { root_folder }
}

/// Read one chunk in the world
///
/// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss.
pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result<ChunkData, WorldError> {
self.read_chunks(vec![chunk])
.await
.pop()
.expect("Read chunks must return a chunk")
.1
}
// /// Read one chunk in the world
// ///
// /// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss.
// pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result<ChunkData, WorldError> {
// self.read_chunks(vec![chunk])
// .await
// .pop()
// .expect("Read chunks must return a chunk")
// .1
// }

/// Read many chunks in a world
/// MUST be called from a tokio runtime thread
///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub async fn read_chunks(
&self,
chunks: Vec<(i32, i32)>,
) -> Vec<((i32, i32), Result<ChunkData, WorldError>)> {
futures::future::join_all(
chunks
.into_iter()
// split chunks into their corresponding region files to be able to read all of them at once, instead of reopening the file multiple times
.chunk_by(|chunk| {
(
((chunk.0 as f32) / 32.0).floor() as i32,
((chunk.1 as f32) / 32.0).floor() as i32,
)
})
.into_iter()
.map(|(region, chunk_vec)| {
let mut path = self.root_folder.clone();
let chunk_vec = chunk_vec.collect_vec();
path.push("region");
path.push(format!("r.{}.{}.mca", region.0, region.1));
self.read_region_chunks(path, chunk_vec)
}),
)
.await
.into_iter()
.flatten()
.collect_vec()
}
async fn read_region_chunks(
&self,
region_file_path: PathBuf,
chunks: Vec<(i32, i32)>,
) -> Vec<((i32, i32), Result<ChunkData, WorldError>)> {
// dbg!(at);
// return different error when file is not found (because that means that the chunks have just not been generated yet)
let mut region_file = match File::open(&region_file_path).await {
Ok(f) => f,
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
return chunks
.into_iter()
.map(|c| (c, Err(WorldError::RegionNotFound)))
.collect_vec()
}
_ => {
return chunks
.into_iter()
.map(|c| (c, Err(WorldError::IoError(err.kind()))))
.collect_vec()
}
},
};

let mut location_table: [u8; 4096] = [0; 4096];
let mut timestamp_table: [u8; 4096] = [0; 4096];

// fill the location and timestamp tables
{
match region_file.read_exact(&mut location_table).await {
Ok(_) => {}
Err(err) => {
return chunks
.into_iter()
.map(|c| (c, Err(WorldError::IoError(err.kind()))))
.collect_vec()
}
}
match region_file.read_exact(&mut timestamp_table).await {
Ok(_) => {}
Err(err) => {
return chunks
.into_iter()
.map(|c| (c, Err(WorldError::IoError(err.kind()))))
.collect_vec()
channel: mpsc::Sender<((i32, i32), Result<ChunkData, WorldError>)>,
) {
chunks
.into_par_iter()
.map(|chunk| {
let region = (
((chunk.0 as f32) / 32.0).floor() as i32,
((chunk.1 as f32) / 32.0).floor() as i32,
);
let channel = channel.clone();
let mut region_file_path = self.root_folder.clone();
region_file_path.push("region");
region_file_path.push(format!("r.{}.{}.mca", region.0, region.1));

// return different error when file is not found (because that means that the chunks have just not been generated yet)
let mut region_file = match std::fs::File::options().read(true).write(false).open(&region_file_path) {
Ok(f) => f,
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
let _ = channel.blocking_send((chunk, Err(WorldError::RegionNotFound)));
return;
}
_ => {
let _ = channel
.blocking_send((chunk, Err(WorldError::IoError(err.kind()))));
return;
}
},
};

let mut location_table: [u8; 4096] = [0; 4096];
let mut timestamp_table: [u8; 4096] = [0; 4096];

// fill the location and timestamp tables
{
match region_file.read_exact(&mut location_table) {
Ok(_) => {}
Err(err) => {
let _ = channel
.blocking_send((chunk, Err(WorldError::IoError(err.kind()))));
return;
}
}
match region_file.read_exact(&mut timestamp_table) {
Ok(_) => {}
Err(err) => {
let _ = channel
.blocking_send((chunk, Err(WorldError::IoError(err.kind()))));
return;
}
}
}
}
}
// println!("Location table: {:?}", &location_table);

// wrap file with arc mutex to allow for multithreading
let region_file = Arc::new(Mutex::new(region_file));
futures::future::join_all(chunks.into_iter().map(|(old_chunk_x, old_chunk_z)| {
let region_file = region_file.clone();
let modulus = |a: i32, b: i32| ((a % b) + b) % b;
let chunk_x = modulus(old_chunk_x, 32) as u32;
let chunk_z = modulus(old_chunk_z, 32) as u32;
async move {

let modulus = |a: i32, b: i32| ((a % b) + b) % b;
let chunk_x = modulus(chunk.0, 32) as u32;
let chunk_z = modulus(chunk.1, 32) as u32;
let channel = channel.clone();
let table_entry = (chunk_x + chunk_z * 32) * 4;

let mut offset = vec![0u8];
Expand All @@ -161,19 +141,24 @@ impl Level {
let size = location_table[table_entry as usize + 3] as usize * 4096;

if offset == 0 && size == 0 {
return ((old_chunk_x, old_chunk_z), Err(WorldError::ChunkNotFound));
let _ =
channel.blocking_send(((chunk.0, chunk.1), Err(WorldError::ChunkNotFound)));
return;
}
// Read the file using the offset and size
let mut file_buf = {
let mut region_file = region_file.lock().await;
let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)).await;
let seek_result = region_file.seek(std::io::SeekFrom::Start(offset));
if seek_result.is_err() {
return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid));
let _ = channel
.blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid)));
return;
}
let mut out = vec![0; size];
let read_result = region_file.read_exact(&mut out).await;
let read_result = region_file.read_exact(&mut out);
if read_result.is_err() {
return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid));
let _ = channel
.blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid)));
return;
}
out
};
Expand All @@ -186,7 +171,11 @@ impl Level {
2 => Compression::Zlib,
3 => Compression::None,
4 => Compression::LZ4,
_ => return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)),
_ => {
let _ =
channel.send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid)));
return;
}
};

match compression {
Expand All @@ -195,32 +184,22 @@ impl Level {
}

let size = u32::from_be_bytes(header[0..4].try_into().unwrap());

// size includes the compression scheme byte, so we need to subtract 1
let chunk_data = file_buf.drain(0..size as usize - 1).collect_vec();

((old_chunk_x, old_chunk_z), Ok(chunk_data))
}
}))
.await
.into_par_iter()
.map(|((old_chunk_x, old_chunk_z), chunk_data)| {
let chunk_data = match chunk_data {
Ok(c) => c,
Err(e) => return ((old_chunk_x, old_chunk_z), Err(e)),
};
let mut z = ZlibDecoder::new(&chunk_data[..]);
let mut chunk_data = Vec::new();
match z.read_to_end(&mut chunk_data) {
Ok(_) => {}
Err(err) => return ((old_chunk_x, old_chunk_z), Err(WorldError::ZlibError(err))),
}

(
(old_chunk_x, old_chunk_z),
ChunkData::from_bytes(chunk_data, (old_chunk_x, old_chunk_z)),
)
})
.collect()
let mut z = ZlibDecoder::new(&chunk_data[..]);
let mut chunk_data = Vec::new();
match z.read_to_end(&mut chunk_data) {
Ok(_) => {}
Err(e) => {
let _ = channel.blocking_send((chunk, Err(WorldError::ZlibError(e))));
return;
}
}

let _ = channel.blocking_send((chunk, ChunkData::from_bytes(chunk_data, chunk)));
})
.collect::<Vec<()>>();
}
}
37 changes: 23 additions & 14 deletions pumpkin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use pumpkin_world::{dimension::Dimension, radial_chunk_iterator::RadialIterator}
use pumpkin_registry::Registry;
use rsa::{traits::PublicKeyParts, RsaPrivateKey, RsaPublicKey};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

use crate::{
client::Client,
Expand Down Expand Up @@ -330,24 +331,33 @@ impl Server {

// TODO: do this in a world
async fn spawn_test_chunk(client: &mut Client) {
let chunks = Dimension::OverWorld
.into_level(
let inst = std::time::Instant::now();
let (sender, mut chunk_receiver) = mpsc::channel(1024);
tokio::spawn(async move {
let level = Dimension::OverWorld.into_level(
// TODO: load form config
"./world".parse().unwrap(),
)
.read_chunks(RadialIterator::new(32).collect())
.await;
);
level
.read_chunks(RadialIterator::new(32).collect(), sender)
.await;
});

client.send_packet(&CCenterChunk {
chunk_x: 0.into(),
chunk_z: 0.into(),
});

chunks.iter().for_each(|chunk| {

while let Some((chunk_pos, chunk_data)) = chunk_receiver.recv().await {
// dbg!(chunk_pos);
let chunk_data = match chunk_data {
Ok(d) => d,
Err(_) => continue,
};
#[cfg(debug_assertions)]
if chunk.0 == (0, 0) {
if chunk_pos == (0, 0) {
let mut test = ByteBuffer::empty();
CChunkData(chunk.1.as_ref().unwrap()).write(&mut test);
CChunkData(&chunk_data).write(&mut test);
let len = test.buf().len();
log::debug!(
"Chunk packet size: {}B {}KB {}MB",
Expand All @@ -356,11 +366,10 @@ impl Server {
len / (1024 * 1024)
);
}
match &chunk.1 {
Err(err) => {},
Ok(data) => client.send_packet(&CChunkData(data)),
}
});
client.send_packet(&CChunkData(&chunk_data));
}
let t = std::time::Instant::now().duration_since(inst);
dbg!("DONE", t);
}

// move to world
Expand Down

0 comments on commit 03f9b37

Please sign in to comment.