Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add s3 test #248

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object-store = ["fusio/object_store"]
opfs = [
"dep:wasm-bindgen-futures",
"fusio-dispatch/opfs",
"fusio-parquet/opfs",
"fusio-parquet/web",
"fusio/opfs",
]
redb = ["dep:redb"]
Expand All @@ -38,7 +38,8 @@ tokio = [
"tokio/fs",
]
tokio-http = ["fusio/tokio-http"]
wasm = ["aws", "bytes", "opfs"]
wasm = ["aws", "bytes", "opfs", "wasm-http"]
wasm-http = ["fusio/wasm-http"]

[[example]]
name = "declare"
Expand Down Expand Up @@ -77,14 +78,14 @@ async-trait = { version = "0.1", optional = true }
bytes = { version = "1.7", optional = true }
crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "43", optional = true }
datafusion = { version = "42.2.0", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.3", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.3", features = [
"dyn",
"fs",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-parquet", version = "0.2.1" }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.1", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.1", features = [
"aws",
"tokio",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.0", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.0", features = [
"aws",
"tokio",
] }
Expand Down
1 change: 1 addition & 0 deletions bindings/python/python/tonbo/fs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class FsOptions:
region: str | None = None,
sign_payload: bool | None = None,
checksum: bool | None = None,
endpoint: str | None = None,
) -> None: ...

def parse(path: str) -> str: ...
Expand Down
63 changes: 63 additions & 0 deletions bindings/python/tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

import pytest
import tempfile
from tonbo import DbOption, Column, DataType, Record, TonboDB
from tonbo.fs import FsOptions, AwsCredential, from_url_path


@Record
class User:
id = Column(DataType.Int64, name="id", primary_key=True)
name = Column(DataType.String, name="name")
email = Column(DataType.String, name="email", nullable=True)
age = Column(DataType.UInt8, name="age")
data = Column(DataType.Bytes, name="data")


@pytest.mark.asyncio
@pytest.mark.skipif("S3" not in os.environ, reason="s3")
async def test_s3_read_write():

temp_dir = tempfile.TemporaryDirectory()

key_id = os.environ['AWS_ACCESS_KEY_ID']
secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
credential = AwsCredential(key_id, secret_key)
fs_option = FsOptions.S3("wasm-data", credential,"ap-southeast-2",None, None, None)

option = DbOption(temp_dir.name)
option.level_path(0, from_url_path("l0"), fs_option)
option.level_path(1, from_url_path("l1"), fs_option)
option.level_path(2, from_url_path("l2"), fs_option)

option.immutable_chunk_num = 1
option.major_threshold_with_sst_size = 3
option.level_sst_magnification = 1
option.max_sst_file_size = 1 * 1024

db = TonboDB(option, User())
for i in range(0, 500):
if i % 100 == 0:
await db.flush()
await db.insert(
User(
id=i,
age=i % 128,
name=str(i * 10),
email=str(i * 20),
data=b"Hello Tonbo!",
)
)
user = await db.get(10)
assert user == {
"id": 10,
"name": str(10 * 10),
"email": str(10 * 20),
"age": 10,
"data": b"Hello Tonbo!",
}




5 changes: 4 additions & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,16 @@ where
inner: level_scan_ll,
});
}

let level_l_path = option.level_fs_path(level + 1).unwrap_or(&option.base_path);
let level_l_fs = manager.get_fs(level_l_path);
Self::build_tables(
option,
version_edits,
level + 1,
streams,
instance,
level_fs,
level_l_fs,
)
.await?;

Expand Down
115 changes: 115 additions & 0 deletions tests/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,119 @@ mod tests {
drop(db);
remove("opfs_dir").await;
}

#[ignore]
#[wasm_bindgen_test]
async fn test_s3_read_write() {
use fusio::remotes::aws::AwsCredential;
use fusio_dispatch::FsOptions;

if option_env!("AWS_ACCESS_KEY_ID").is_none() {
return;
}
let key_id = option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
let secret_key = option_env!("AWS_SECRET_ACCESS_KEY").unwrap().to_string();

let (cols_desc, primary_key_index) = test_dyn_item_schema();

let fs_option = FsOptions::S3 {
bucket: "wasm-data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
token: None,
}),
endpoint: None,
sign_payload: None,
checksum: None,
region: Some("ap-southeast-2".to_string()),
};

let option = DbOption::with_path(
Path::from_opfs_path("s3_rw").unwrap(),
"id".to_string(),
primary_key_index,
)
.level_path(
0,
Path::from_url_path("tonbo/l0").unwrap(),
fs_option.clone(),
)
.unwrap()
.level_path(
1,
Path::from_url_path("tonbo/l1").unwrap(),
fs_option.clone(),
)
.unwrap()
.level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option)
.unwrap()
.major_threshold_with_sst_size(3)
.level_sst_magnification(1)
.max_sst_file_size(1 * 1024);

let db: DB<DynRecord, OpfsExecutor> =
DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();

for (i, item) in test_dyn_items().into_iter().enumerate() {
db.insert(item).await.unwrap();
if i % 5 == 0 {
db.flush().await.unwrap();
}
}

{
let tx = db.transaction().await;

let mut scan = tx
.scan((Bound::Unbounded, Bound::Unbounded))
.projection(vec![0, 1, 2])
.take()
.await
.unwrap();

let mut i = 0;
while let Some(entry) = scan.next().await.transpose().unwrap() {
let columns = entry.value().unwrap().columns;

let primary_key_col = columns.first().unwrap();
assert_eq!(primary_key_col.datatype, Datatype::Int64);
assert_eq!(primary_key_col.name, "id".to_string());
assert_eq!(
*primary_key_col
.value
.as_ref()
.downcast_ref::<i64>()
.unwrap(),
i
);

let col = columns.get(1).unwrap();
assert_eq!(col.datatype, Datatype::Int8);
assert_eq!(col.name, "age".to_string());
let age = col.value.as_ref().downcast_ref::<Option<i8>>();
assert!(age.is_some());
assert_eq!(age.unwrap(), &Some(i as i8));

let col = columns.get(2).unwrap();
assert_eq!(col.datatype, Datatype::String);
assert_eq!(col.name, "name".to_string());
let name = col.value.as_ref().downcast_ref::<Option<String>>();
assert!(name.is_some());
assert_eq!(name.unwrap(), &Some(i.to_string()));

let col = columns.get(4).unwrap();
assert_eq!(col.datatype, Datatype::Bytes);
assert_eq!(col.name, "bytes".to_string());
let bytes = col.value.as_ref().downcast_ref::<Option<Vec<u8>>>();
assert!(bytes.unwrap().is_none());
i += 1
}
}
drop(db);

remove("s3_rw").await;
}
}
Loading