Skip to content

Commit

Permalink
test: Introduce integration test of rest catalog client against docke…
Browse files Browse the repository at this point in the history
…r container
  • Loading branch information
liurenjie1024 committed Dec 8, 2023
1 parent d206c1d commit 13f5d9c
Show file tree
Hide file tree
Showing 13 changed files with 788 additions and 9 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[workspace]
resolver = "2"
members = ["crates/catalog/*", "crates/iceberg"]
members = ["crates/catalog/*", "crates/iceberg", "crates/test_utils"]

[workspace.dependencies]
anyhow = "1.0.72"
Expand All @@ -31,6 +31,7 @@ bitvec = "1.0.1"
chrono = "0.4"
derive_builder = "0.12.0"
either = "1"
env_logger = "0.10.0"
futures = "0.3"
iceberg = { path = "./crates/iceberg" }
iceberg-catalog-rest = { path = "./crates/catalog/rest" }
Expand All @@ -43,6 +44,7 @@ once_cell = "1"
opendal = "0.42"
ordered-float = "4.0.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
# specific language governing permissions and limitations
# under the License.

.EXPORT_ALL_VARIABLES:

RUST_LOG = debug

build:
cargo build

check-fmt:
cargo fmt --all -- --check

Expand Down
3 changes: 3 additions & 0 deletions crates/catalog/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ keywords = ["iceberg", "rest", "catalog"]
async-trait = { workspace = true }
chrono = { workspace = true }
iceberg = { workspace = true }
log = "0.4.20"
reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
Expand All @@ -40,5 +41,7 @@ urlencoding = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
port_scanner = { workspace = true }
tokio = { workspace = true }
48 changes: 42 additions & 6 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::HashMap;

use async_trait::async_trait;
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use reqwest::{Client, Request};
use reqwest::{Client, Request, Response, StatusCode};
use serde::de::DeserializeOwned;
use typed_builder::TypedBuilder;
use urlencoding::encode;
Expand Down Expand Up @@ -163,15 +163,45 @@ impl HttpClient {
) -> Result<()> {
let resp = self.0.execute(request).await?;

println!("Status code: {}", resp.status());

if resp.status().as_u16() == SUCCESS_CODE {
Ok(())
} else {
let code = resp.status();
let text = resp.bytes().await?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_source(e)
})?;
Err(e.into())
}
}

/// More generic logic handling for special cases like head.
async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
&self,
request: Request,
handler: impl FnOnce(&Response) -> Option<R>,
) -> Result<R> {
let resp = self.0.execute(request).await?;

if let Some(ret) = handler(&resp) {
Ok(ret)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("code", code.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Expand Down Expand Up @@ -273,9 +303,12 @@ impl Catalog for RestCatalog {
.build()?;

self.client
.execute::<ErrorResponse, NO_CONTENT>(request)
.do_execute::<bool, ErrorResponse>(request, |resp| match resp.status() {
StatusCode::NO_CONTENT => Some(true),
StatusCode::NOT_FOUND => Some(false),
_ => None,
})
.await
.map(|_| true)
}

/// Drop a namespace from the catalog.
Expand Down Expand Up @@ -326,7 +359,7 @@ impl Catalog for RestCatalog {
partition_spec: creation.partition_spec,
write_order: creation.sort_order,
// We don't support stage create yet.
stage_create: None,
stage_create: Some(false),
properties: if creation.properties.is_empty() {
None
} else {
Expand Down Expand Up @@ -406,9 +439,12 @@ impl Catalog for RestCatalog {
.build()?;

self.client
.execute::<ErrorResponse, NO_CONTENT>(request)
.do_execute::<bool, ErrorResponse>(request, |resp| match resp.status() {
StatusCode::NO_CONTENT => Some(true),
StatusCode::NOT_FOUND => Some(false),
_ => None,
})
.await
.map(|_| true)
}

/// Rename a table in the catalog.
Expand Down
65 changes: 65 additions & 0 deletions crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3.8'

services:
rest:
image: tabulario/iceberg-rest:0.10.0
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- CATALOG_WAREHOUSE=s3://icebergdata/demo
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
depends_on:
- minio
links:
- minio:icebergdata.minio
expose:
- 8181

minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
expose:
- 9001
- 9000
command: [ "server", "/data", "--console-address", ":9001" ]

mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/icebergdata;
/usr/bin/mc mb minio/icebergdata;
/usr/bin/mc policy set public minio/icebergdata;
tail -f /dev/null
"
Loading

0 comments on commit 13f5d9c

Please sign in to comment.