From af01ae333546ef06c1b082bc13215c06a7659cb3 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 27 Nov 2024 19:13:58 -0300 Subject: [PATCH] test: improve e2e workspace --- .github/workflows/ci.yml | 2 +- Cargo.lock | 92 +-- Cargo.toml | 16 +- Makefile | 18 +- crates/integration-tests/src/main.rs | 718 ------------------ .../integration}/Cargo.toml | 3 +- .../configs/test_postgres_all_data_types.yaml | 0 .../test_postgres_consumer_offsets.yaml | 0 ...ostgres_with_json_sql_transformations.yaml | 0 tests/integration/src/all_data_types.rs | 174 +++++ tests/integration/src/consumer_offsets.rs | 101 +++ .../src/json_sql_transformations.rs | 67 ++ tests/integration/src/main.rs | 43 ++ tests/integration/src/utils/cdk.rs | 80 ++ tests/integration/src/utils/ctx.rs | 57 ++ tests/integration/src/utils/db.rs | 113 +++ tests/integration/src/utils/docker_conn.rs | 15 + tests/integration/src/utils/fluvio_conn.rs | 59 ++ tests/integration/src/utils/mod.rs | 204 +++++ tests/integration/src/utils/smdk.rs | 29 + 20 files changed, 1017 insertions(+), 774 deletions(-) delete mode 100644 crates/integration-tests/src/main.rs rename {crates/integration-tests => tests/integration}/Cargo.toml (94%) rename {crates/integration-tests => tests/integration}/configs/test_postgres_all_data_types.yaml (100%) rename {crates/integration-tests => tests/integration}/configs/test_postgres_consumer_offsets.yaml (100%) rename {crates/integration-tests => tests/integration}/configs/test_postgres_with_json_sql_transformations.yaml (100%) create mode 100644 tests/integration/src/all_data_types.rs create mode 100644 tests/integration/src/consumer_offsets.rs create mode 100644 tests/integration/src/json_sql_transformations.rs create mode 100644 tests/integration/src/main.rs create mode 100644 tests/integration/src/utils/cdk.rs create mode 100644 tests/integration/src/utils/ctx.rs create mode 100644 tests/integration/src/utils/db.rs create mode 100644 tests/integration/src/utils/docker_conn.rs create mode 100644 tests/integration/src/utils/fluvio_conn.rs create mode 100644 tests/integration/src/utils/mod.rs create mode 100644 tests/integration/src/utils/smdk.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e119bdf..9a7bc58 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,7 +99,7 @@ jobs: run: rustup target add wasm32-wasi - name: Run Integration Test timeout-minutes: 30 - run: make integration_tests + run: START_FLUVIO=false make integration_tests done: name: Done diff --git a/Cargo.lock b/Cargo.lock index 45b1f2b..2546f5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1278,7 +1278,7 @@ dependencies = [ [[package]] name = "fluvio" version = "0.24.1" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -1314,7 +1314,7 @@ dependencies = [ [[package]] name = "fluvio-compression" version = "0.3.4" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "bytes", "flate2", @@ -1329,7 +1329,7 @@ dependencies = [ [[package]] name = "fluvio-connector-common" version = "0.0.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -1352,7 +1352,7 @@ dependencies = [ [[package]] name = "fluvio-connector-derive" version = "0.0.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "proc-macro2", "quote", @@ -1362,7 +1362,7 @@ dependencies = [ [[package]] name = "fluvio-connector-package" version = "0.0.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "bytesize", @@ -1382,7 +1382,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" version = "0.30.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "base64 0.22.1", @@ -1440,7 +1440,7 @@ dependencies = [ [[package]] name = "fluvio-protocol" version = "0.12.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "bytes", "cfg-if", @@ -1462,7 +1462,7 @@ dependencies = [ [[package]] name = "fluvio-protocol-derive" version = "0.5.4" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "proc-macro2", "quote", @@ -1473,7 +1473,7 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" version = "0.25.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "fluvio-controlplane-metadata", @@ -1489,7 +1489,7 @@ dependencies = [ [[package]] name = "fluvio-smartengine" version = "0.8.3" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "cfg-if", @@ -1510,7 +1510,7 @@ dependencies = [ [[package]] name = "fluvio-smartmodule" version = "0.8.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "eyre", "fluvio-protocol", @@ -1522,7 +1522,7 @@ dependencies = [ [[package]] name = "fluvio-smartmodule-derive" version = "0.6.4" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "proc-macro2", "quote", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "fluvio-socket" version = "0.15.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "async-channel 1.9.0", "async-lock", @@ -1557,7 +1557,7 @@ dependencies = [ [[package]] name = "fluvio-spu-schema" version = "0.17.0" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "bytes", "derive_builder", @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "fluvio-stream-dispatcher" version = "0.13.6" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -1598,7 +1598,7 @@ dependencies = [ [[package]] name = "fluvio-stream-model" version = "0.11.4" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "async-lock", "event-listener 5.3.1", @@ -1611,7 +1611,7 @@ dependencies = [ [[package]] name = "fluvio-types" version = "0.5.2" -source = "git+https://github.com/infinyon/fluvio?branch=fix_shutdown_cdk#ddcb0638981b27a79125ff2764d4935204771e70" +source = "git+https://github.com/infinyon/fluvio?branch=master#ddcb0638981b27a79125ff2764d4935204771e70" dependencies = [ "event-listener 5.3.1", "serde", @@ -1910,9 +1910,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ "foldhash", ] @@ -2306,7 +2306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "serde", ] @@ -2336,6 +2336,7 @@ dependencies = [ "fluvio-model-sql", "futures-util", "log", + "once_cell", "rust_decimal", "serde", "serde_json", @@ -2392,9 +2393,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "ittapi" @@ -2508,9 +2509,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "c2ccc108bbc0b1331bd061864e7cd823c0cab660bbe6970e66e2c0614decde36" [[package]] name = "libm" @@ -2738,7 +2739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "crc32fast", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "indexmap 2.6.0", "memchr", ] @@ -2962,9 +2963,9 @@ dependencies = [ [[package]] name = "postcard" -version = "1.0.10" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f7f0a8d620d71c457dd1d47df76bb18960378da56af4527aaa10f515eee732e" +checksum = "f63d01def49fc815900a83e7a4a5083d2abc81b7ddd569a3fa0477778ae9b3ec" dependencies = [ "cobs", "embedded-io 0.4.0", @@ -4020,9 +4021,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -4032,9 +4033,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -4043,9 +4044,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -4359,12 +4360,12 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.220.0" +version = "0.221.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebf48234b389415b226a4daef6562933d38c7b28a8b8f64c5c4130dad1561ab7" +checksum = "de35b6c3ef1f53ac7a31b5e69bc00f1542ea337e7e7162dc34c68b537ff82690" dependencies = [ "leb128", - "wasmparser 0.220.0", + "wasmparser 0.221.0", ] [[package]] @@ -4383,12 +4384,13 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.220.0" +version = "0.221.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e246c2772ce3ebc83f89a2d4487ac5794cad6c309b2071818a88c7db7c36d87b" +checksum = "8659e755615170cfe20da468865c989da78c5da16d8652e69a75acda02406a92" dependencies = [ "bitflags 2.6.0", "indexmap 2.6.0", + "semver", ] [[package]] @@ -4657,24 +4659,24 @@ dependencies = [ [[package]] name = "wast" -version = "220.0.0" +version = "221.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e708c8de08751fd66e70961a32bae9d71901f14a70871e181cb8461a3bb3165" +checksum = "9d8eb1933d493dd07484a255c3f52236123333f5befaa3be36182a50d393ec54" dependencies = [ "bumpalo", "leb128", "memchr", "unicode-width", - "wasm-encoder 0.220.0", + "wasm-encoder 0.221.0", ] [[package]] name = "wat" -version = "1.220.0" +version = "1.221.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de4f1d7d59614ba690541360102b995c4eb1b9ed373701d5102cc1a968b1c5a3" +checksum = "c813fd4e5b2b97242830b56e7b7dc5479bc17aaa8730109be35e61909af83993" dependencies = [ - "wast 220.0.0", + "wast 221.0.0", ] [[package]] @@ -4781,7 +4783,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 170fdfc..6c6ca1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,8 @@ members = [ "crates/sql-sink", "crates/fluvio-model-sql", - "crates/integration-tests", "crates/json-sql", + "tests/integration", ] resolver = "2" @@ -12,7 +12,15 @@ inherits = "release" lto = true [workspace.dependencies] -fluvio = { git = "https://github.com/infinyon/fluvio", branch = "fix_shutdown_cdk" } -fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", branch = "fix_shutdown_cdk" } +fluvio = { git = "https://github.com/infinyon/fluvio", branch = "master" } +fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", branch = "master" } fluvio-future = "0.7.0" -fluvio-smartmodule = { git = "https://github.com/infinyon/fluvio", branch = "fix_shutdown_cdk" } +fluvio-smartmodule = { git = "https://github.com/infinyon/fluvio", branch = "master" } + + +# fluvio = { path = "../../fluvio/crates/fluvio/" } +# fluvio-connector-common = { path = "../../fluvio/crates/fluvio-connector-common/" } +# fluvio-smartmodule = { path = "../../fluvio/crates/fluvio-smartmodule/" } + +# Internal dependencies +fluvio-model-sql = { path = "crates/fluvio-model-sql" } diff --git a/Makefile b/Makefile index 1d50341..8afa44b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,14 @@ -integration_tests: - cdk build -p sql-sink - smdk build -p json-sql - smdk load -p json-sql - RUST_LOG=warn,integration_tests=info cargo run --release -p integration-tests +# Default paths to binaries +BINARIES_PATH?= +FLUVIO_BIN?=$(BINARIES_PATH)fluvio +SMDK_BIN?=$(BINARIES_PATH)smdk +CDK_BIN?=$(BINARIES_PATH)cdk +START_FLUVIO?=true +# Integration tests target +integration_tests: + $(CDK_BIN) build -p sql-sink + $(SMDK_BIN) build -p json-sql + FLUVIO_BIN=$(FLUVIO_BIN) CDK_BIN=$(CDK_BIN) SMDK_BIN=$(SMDK_BIN) \ + START_FLUVIO=$(START_FLUVIO) RUST_LOG=warn,integration_tests=info \ + cargo run --release -p integration-tests diff --git a/crates/integration-tests/src/main.rs b/crates/integration-tests/src/main.rs deleted file mode 100644 index 24b181f..0000000 --- a/crates/integration-tests/src/main.rs +++ /dev/null @@ -1,718 +0,0 @@ -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - process::Command, - time::Duration, -}; - -use anyhow::{Context, Result}; -use async_std::task; -use bollard::{ - container::{Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions}, - image::CreateImageOptions, - service::{HostConfig, PortBinding}, - Docker, -}; -use futures_util::stream::TryStreamExt; -use log::{debug, info}; -use serde::Deserialize; -use serde_json::json; -use sqlx::{postgres::PgRow, Connection, FromRow, PgConnection}; - -use fluvio::{metadata::topic::TopicSpec, Fluvio, RecordKey}; -use fluvio_future::{ - retry::{retry, ExponentialBackoff}, - timer::sleep, -}; -use fluvio_model_sql::{Insert, Operation, Type, Upsert, Value}; - -const POSTGRES_IMAGE: &str = "postgres:15.2"; -const POSTGRES_HOST_PORT: &str = "5432"; -const POSTGRES_PASSWORD: &str = "passpass"; -const POSTGRES_USER: &str = "pguser"; -const POSTGRES_DB: &str = POSTGRES_USER; -const CDK_BIN: &str = "cdk"; - -#[async_std::main] -async fn main() -> Result<()> { - env_logger::init(); - - info!("preparing environment"); - let docker = connect_docker() - .await - .context("unable to connect to docker engine")?; - - let fluvio = connect_fluvio() - .await - .context("unable to connect to fluvio cluster")?; - - let mut pg_conn = run_postgres(&docker) - .await - .context("unable to run posgres container")?; - - info!("running sql-connector integration tests"); - - let result1 = test_postgres_all_data_types(&fluvio, &mut pg_conn) - .await - .context("test_postgres_all_data_types failed"); - - let result2 = test_postgres_with_json_sql_transformations(&fluvio, &mut pg_conn) - .await - .context("test_postgres_with_json_sql_transformations failed"); - - let result3 = test_postgres_consumer_offsets(&fluvio, &mut pg_conn) - .await - .context("test_postgres_consumer_offsets failed"); - let _ = remove_postgres(&docker).await; - - result1?; - result2?; - result3?; - - info!("ALL PASSED"); - - Ok(()) -} - -async fn test_postgres_all_data_types(fluvio: &Fluvio, pg_conn: &mut PgConnection) -> Result<()> { - #[derive(sqlx::FromRow, Debug)] - #[allow(dead_code)] - struct TestRecord { - bool_col: bool, - smallint_col: i16, - int_col: i32, - bigint_col: i64, - float_col: f32, - double_col: f64, - text_col: String, - bytes_col: Vec, - numeric_col: rust_decimal::Decimal, - timestamp_col: chrono::NaiveDateTime, - date_col: chrono::NaiveDate, - time_col: chrono::NaiveTime, - uuid_col: uuid::Uuid, - json_col: serde_json::Value, - char_col: i8, - } - - info!("running 'test_postgres_all_data_types' test"); - let config_path = new_config_path("test_postgres_all_data_types.yaml")?; - debug!("{config_path:?}"); - let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; - let table = "test_postgres_all_data_types"; - sqlx::query(&format!( - "CREATE TABLE {} ( - bool_col bool NOT NULL, - smallint_col smallint NOT NULL, - int_col int NOT NULL, - bigint_col bigint NOT NULL, - float_col float4 NOT NULL, - double_col float8 NOT NULL, - text_col varchar NOT NULL, - bytes_col bytea NOT NULL, - numeric_col numeric NOT NULL, - timestamp_col timestamp NOT NULL, - date_col date NOT NULL, - time_col time NOT NULL, - uuid_col uuid NOT NULL PRIMARY KEY, - json_col json NOT NULL, - char_col \"char\" NOT NULL - )", - table - )) - .execute(&mut *pg_conn) - .await - .context(format!("unable to create table {table})"))?; - - cdk_deploy_start(&config_path, None).await?; - let connector_name = &config.meta.name; - let connector_status = cdk_deploy_status(connector_name)?; - info!("connector: {connector_name}, status: {connector_status:?}"); - - let count = 10; - let records = generate_records(table, count)?; - - assert_eq!(records.len(), count); - - { - produce_to_fluvio( - fluvio, - &config.meta.topic, - records - .iter() - .map(|op| serde_json::to_string(&Operation::Insert(op.clone()))) - .collect::>()?, - ) - .await?; - let mut received_records: Vec = read_from_postgres(table, count).await?; - - received_records.sort_by_key(|r| r.smallint_col); - - assert_eq!(received_records.len(), count); - for (i, record) in received_records.into_iter().enumerate() { - assert_eq!(record.int_col as usize, i); - assert_eq!(record.smallint_col as usize, i); - assert_eq!(record.bigint_col as usize, i); - } - } - - // first upsert should do nothing - { - let records = records - .iter() - .map(|op| { - let op = Upsert { - table: op.table.clone(), - values: op.values.clone(), - uniq_idx: "uuid_col".into(), - }; - serde_json::to_string(&Operation::Upsert(op)) - }) - .collect::>()?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - - task::sleep(Duration::from_secs(3)).await; - - let mut received_records: Vec = read_from_postgres(table, count).await?; - - received_records.sort_by_key(|r| r.smallint_col); - - assert_eq!(received_records.len(), count); - for (i, record) in received_records.into_iter().enumerate() { - assert_eq!(record.int_col as usize, i); - assert_eq!(record.smallint_col as usize, i); - assert_eq!(record.bigint_col as usize, i); - } - } - - // second upsert should do update - { - let records = records - .iter() - .enumerate() - .map(|(i, op)| { - let mut op = Upsert { - table: op.table.clone(), - values: op.values.clone(), - uniq_idx: "uuid_col".into(), - }; - op.values[2].raw_value = (i + 1).to_string(); - op.values[3].raw_value = (i + 2).to_string(); - op.values[4].raw_value = (i + 4).to_string(); - serde_json::to_string(&Operation::Upsert(op)) - }) - .collect::>()?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - - task::sleep(Duration::from_secs(3)).await; - - let mut received_records: Vec = read_from_postgres(table, count).await?; - - received_records.sort_by_key(|r| r.smallint_col); - - assert_eq!(received_records.len(), count); - for (i, record) in received_records.into_iter().enumerate() { - assert_eq!(record.smallint_col as usize, i + 1); - assert_eq!(record.int_col as usize, i + 2); - assert_eq!(record.bigint_col as usize, i + 4); - } - } - - cdk_deploy_shutdown(connector_name)?; - remove_topic(fluvio, &config.meta.topic).await?; - - info!("test 'test_postgres_all_data_types' passed"); - Ok(()) -} - -async fn test_postgres_with_json_sql_transformations( - fluvio: &Fluvio, - pg_conn: &mut PgConnection, -) -> Result<()> { - // given - info!("running 'test_postgres_with_json_sql_transformations' test"); - let config_path = new_config_path("test_postgres_with_json_sql_transformations.yaml")?; - debug!("{config_path:?}"); - let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; - let table = "test_postgres_with_json_sql_transformations"; - sqlx::query(&format!( - "CREATE TABLE {} (device_id int, record json)", - table - )) - .execute(&mut *pg_conn) - .await - .context(format!("unable to create table {table})"))?; - - cdk_deploy_start(&config_path, None).await?; - let connector_name = &config.meta.name; - let connector_status = cdk_deploy_status(connector_name)?; - info!("connector: {connector_name}, status: {connector_status:?}"); - - let count = 10; - let records = generate_json_records(count); - - produce_to_fluvio(fluvio, &config.meta.topic, records.clone()).await?; - - // when - let read_result = read_from_postgres(table, count).await; - cdk_deploy_shutdown(connector_name)?; - remove_topic(fluvio, &config.meta.topic).await?; - let received_records: Vec = read_result?; - - // then - assert_eq!(received_records.len(), count); - for (i, record) in received_records.into_iter().enumerate() { - assert_eq!(record.device_id as usize, i); - assert_eq!(record.record, json!({"device": { "device_id" : i }})); - } - info!("test 'test_postgres_with_json_sql_transformations' passed"); - Ok(()) -} - -async fn test_postgres_consumer_offsets(fluvio: &Fluvio, pg_conn: &mut PgConnection) -> Result<()> { - // given - info!("running 'test_postgres_consumer_offsets' test"); - let config_path = new_config_path("test_postgres_consumer_offsets.yaml")?; - debug!("{config_path:?}"); - let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; - let table = "test_postgres_consumer_offsets"; - sqlx::query(&format!( - "CREATE TABLE {} (device_id int, record json)", - table - )) - .execute(&mut *pg_conn) - .await - .context(format!("unable to create table {table})"))?; - - cdk_deploy_start(&config_path, None).await?; - let connector_name = &config.meta.name; - let connector_status = cdk_deploy_status(connector_name)?; - info!("connector: {connector_name}, status: {connector_status:?}"); - - sleep(Duration::from_secs(3)).await; - let records = generate_raw_records(table, 0, 2)?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - let records = generate_raw_records(table, 2, 4)?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - info!("waiting for connector to catch up"); - sleep(Duration::from_secs(3)).await; - - // when - info!("shutting down connector"); - cdk_deploy_shutdown(connector_name)?; - - info!("producing more records with connector down"); - sleep(Duration::from_secs(3)).await; - let records = generate_raw_records(table, 4, 6)?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - - info!("restarting connector"); - cdk_deploy_start(&config_path, None).await?; - let records = generate_raw_records(table, 6, 8)?; - produce_to_fluvio(fluvio, &config.meta.topic, records).await?; - sleep(Duration::from_secs(3)).await; - let connector_name = &config.meta.name; - let connector_status = cdk_deploy_status(connector_name)?; - info!("connector: {connector_name}, status: {connector_status:?}"); - - let read_result = read_from_postgres(table, 8).await; - let received_records: Vec = read_result?; - remove_topic(fluvio, &config.meta.topic).await?; - cdk_deploy_shutdown(connector_name)?; - - // then - assert_eq!(received_records.len(), 8); - for (i, record) in received_records.into_iter().enumerate() { - assert_eq!(record.device_id as usize, i); - assert_eq!(record.record, json!({"device": { "device_id" : i }})); - } - - let consumer = fluvio - .consumer_offsets() - .await? - .into_iter() - .find(|c| c.consumer_id.eq("test-postgres-consumer-offsets")); - - // then - assert!(consumer.is_some()); - assert!(consumer.unwrap().offset >= 0); - info!("test 'test_postgres_consumer_offsets' passed"); - Ok(()) -} - -async fn connect_docker() -> Result { - info!("checking docker engine availability"); - - let docker = Docker::connect_with_local_defaults()?; - let version = docker.version().await?; - info!( - "connected to docker version: {:?}, api_version: {:?}", - &version.version, &version.api_version - ); - Ok(docker) -} - -async fn connect_fluvio() -> Result { - info!("checking fluvio cluster availability"); - let fluvio = fluvio::Fluvio::connect().await?; - info!("connected to fluvio version: {}", fluvio.platform_version()); - Ok(fluvio) -} - -async fn run_postgres(docker: &Docker) -> Result { - info!("starting postgres container"); - - let config: Config = Config { - image: Some(POSTGRES_IMAGE.to_owned()), - exposed_ports: Some(HashMap::from([( - POSTGRES_HOST_PORT.to_owned(), - Default::default(), - )])), - host_config: Some(HostConfig { - port_bindings: Some(HashMap::from([( - POSTGRES_HOST_PORT.to_owned(), - Some(vec![PortBinding { - host_ip: Some("0.0.0.0".to_owned()), - host_port: Some(POSTGRES_HOST_PORT.to_owned()), - }]), - )])), - ..Default::default() - }), - env: Some(vec![ - format!("POSTGRES_PASSWORD={POSTGRES_PASSWORD}"), - format!("POSTGRES_USER={POSTGRES_USER}"), - ]), - ..Default::default() - }; - let _ = &docker - .create_image( - Some(CreateImageOptions { - from_image: POSTGRES_IMAGE, - ..Default::default() - }), - None, - None, - ) - .try_collect::>() - .await?; - - let _ = &docker - .create_container( - Some(CreateContainerOptions { - name: "postgres", - platform: None, - }), - config, - ) - .await?; - - let _ = &docker - .start_container("postgres", None::>) - .await?; - - info!("postgres container created, waiting for readiness"); - let conn = retry(ExponentialBackoff::from_millis(10).take(6), || { - connect_postgres() - }) - .await?; - - info!("postgres container started with {POSTGRES_IMAGE} image"); - - Ok(conn) -} - -async fn connect_postgres() -> Result { - let connection_str = postgres_connection_str(); - debug!("connecting to {connection_str}"); - let mut conn = PgConnection::connect(&connection_str).await?; - - sqlx::query("SELECT count(*) FROM pg_catalog.pg_tables") - .fetch_one(&mut conn) - .await?; - Ok(conn) -} - -fn postgres_connection_str() -> String { - format!( - "postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{POSTGRES_HOST_PORT}/{POSTGRES_DB}" - ) -} - -fn new_config_path(name: &str) -> Result { - let package_dir = std::env::var("CARGO_MANIFEST_DIR")?; - let mut path = PathBuf::new(); - path.push(package_dir); - path.push("configs"); - path.push(name); - Ok(path) -} - -fn connector_dir() -> Result { - let package_dir = std::env::var("CARGO_MANIFEST_DIR")?; - let mut path = PathBuf::new(); - path.push(package_dir); - path.push(".."); - path.push("sql-sink"); - Ok(path.canonicalize()?) -} - -async fn cdk_deploy_start(config_path: &Path, env: Option<(&str, &str)>) -> Result<()> { - let connector_dir = connector_dir()?; - info!( - "deploying connector with config from {config_path:?}, connector_dir: {}", - connector_dir.to_string_lossy() - ); - let mut command = Command::new(CDK_BIN); - command.current_dir(&connector_dir); - command - .arg("deploy") - .arg("start") - .arg("--config") - .arg(config_path); - if let Some((env_name, env_value)) = env { - command.env(env_name, env_value); - } - let output = command.output()?; - if !output.status.success() { - anyhow::bail!( - "`cdk deploy start` failed with:\n {}", - String::from_utf8_lossy(output.stderr.as_slice()) - ) - } - task::sleep(Duration::from_secs(10)).await; // time for connector to start - Ok(()) -} - -fn cdk_deploy_shutdown(connector_name: &str) -> Result<()> { - info!("shutting down connector {connector_name}"); - let output = Command::new(CDK_BIN) - .arg("deploy") - .arg("shutdown") - .arg("--name") - .arg(connector_name) - .output()?; - if !output.status.success() { - anyhow::bail!( - "`cdk deploy shutdown` failed with:\n {}", - String::from_utf8_lossy(output.stderr.as_slice()) - ) - } - Ok(()) -} - -fn cdk_deploy_status(connector_name: &str) -> Result> { - let output = Command::new(CDK_BIN).arg("deploy").arg("list").output()?; - if !output.status.success() { - anyhow::bail!( - "`cdk deploy list` failed with:\n {}", - String::from_utf8_lossy(output.stderr.as_slice()) - ) - } - for line in String::from_utf8_lossy(output.stdout.as_slice()) - .lines() - .skip(1) - { - let mut column_iter = line.split_whitespace(); - match column_iter.next() { - Some(name) if name.eq(connector_name) => { - return Ok(column_iter.next().map(|s| s.to_owned())) - } - _ => {} - } - } - Ok(None) -} - -fn generate_records(table: &str, count: usize) -> Result> { - let mut result = Vec::with_capacity(count); - for i in 0..count { - let op = Insert { - table: table.to_string(), - values: vec![ - Value { - column: "json_col".to_string(), - raw_value: "{\"json_key\":\"json_value\"}".to_string(), - type_: Type::Json, - }, - Value { - column: "bool_col".to_string(), - raw_value: "true".to_string(), - type_: Type::Bool, - }, - Value { - column: "smallint_col".to_string(), - raw_value: i.to_string(), - type_: Type::SmallInt, - }, - Value { - column: "int_col".to_string(), - raw_value: i.to_string(), - type_: Type::Int, - }, - Value { - column: "bigint_col".to_string(), - raw_value: i.to_string(), - type_: Type::BigInt, - }, - Value { - column: "text_col".to_string(), - raw_value: "some text".to_string(), - type_: Type::Text, - }, - Value { - column: "bytes_col".to_string(), - raw_value: "some bytes".to_string(), - type_: Type::Bytes, - }, - Value { - column: "float_col".to_string(), - raw_value: "3.123".to_string(), - type_: Type::Float, - }, - Value { - column: "double_col".to_string(), - raw_value: "3.333333333".to_string(), - type_: Type::DoublePrecision, - }, - Value { - column: "numeric_col".to_string(), - raw_value: rust_decimal::Decimal::TEN.to_string(), - type_: Type::Numeric, - }, - Value { - column: "timestamp_col".to_string(), - raw_value: chrono::Utc::now().naive_local().to_string(), - type_: Type::Timestamp, - }, - Value { - column: "date_col".to_string(), - raw_value: chrono::Utc::now().naive_local().date().to_string(), - type_: Type::Date, - }, - Value { - column: "time_col".to_string(), - raw_value: chrono::Utc::now() - .naive_local() - .time() - .format("%H:%M:%S") - .to_string(), - type_: Type::Time, - }, - Value { - column: "uuid_col".to_string(), - raw_value: uuid::Uuid::new_v4().to_string(), - type_: Type::Uuid, - }, - Value { - column: "char_col".to_string(), - raw_value: "126".to_string(), - type_: Type::Char, - }, - ], - }; - result.push(op); - } - Ok(result) -} - -fn generate_raw_records(table: &str, start: usize, end: usize) -> Result> { - let mut result = Vec::with_capacity(end); - for i in start..end { - let op = Insert { - table: table.to_string(), - values: vec![ - Value { - column: "device_id".to_string(), - raw_value: i.to_string(), - type_: Type::Int, - }, - Value { - column: "record".to_string(), - raw_value: format!("{{\"device\":{{\"device_id\":{i}}}}}"), - type_: Type::Json, - }, - ], - }; - result.push(op); - } - Ok(result - .iter() - .map(|op| serde_json::to_string(&Operation::Insert(op.clone()))) - .collect::>()?) -} - -fn generate_json_records(count: usize) -> Vec { - (0..count) - .map(|i| format!("{{\"device\":{{\"device_id\":{i}}}}}")) - .collect() -} - -async fn produce_to_fluvio> + std::fmt::Debug + Send + Sync + 'static>( - fluvio: &Fluvio, - fluvio_topic: &str, - records: Vec, -) -> Result<()> { - let producer = fluvio.topic_producer(fluvio_topic).await?; - for record in records { - producer.send(RecordKey::NULL, record).await?; - } - producer.flush().await?; - Ok(()) -} - -async fn remove_topic(fluvio: &Fluvio, topic: &str) -> Result<()> { - fluvio.admin().await.delete::(topic).await?; - Ok(()) -} - -async fn read_from_postgres(table: &str, count: usize) -> Result> -where - R: for<'r> FromRow<'r, PgRow> + Unpin + Send, -{ - let result = retry(ExponentialBackoff::from_millis(10).take(5), || { - let sql = format!("SELECT * FROM {table} LIMIT {count}"); - async move { - let mut pg_conn = connect_postgres().await?; - let result: Vec = sqlx::query_as(&sql).fetch_all(&mut pg_conn).await?; - if result.len() != count { - anyhow::bail!("not all expected records are ready yet"); - } - Ok::, anyhow::Error>(result) - } - }) - .await?; - - Ok(result) -} - -async fn remove_postgres(docker: &Docker) -> Result<()> { - let _ = &docker - .remove_container( - "postgres", - Some(RemoveContainerOptions { - v: true, - force: true, - ..Default::default() - }), - ) - .await?; - info!("postgres container removed"); - Ok(()) -} - -#[derive(Debug, Deserialize)] -struct MetaConfig { - name: String, - topic: String, -} - -#[derive(Debug, Deserialize)] -struct TestConfig { - meta: MetaConfig, -} - -#[derive(sqlx::FromRow, Debug)] -struct TestRecord { - device_id: i32, - record: serde_json::Value, -} diff --git a/crates/integration-tests/Cargo.toml b/tests/integration/Cargo.toml similarity index 94% rename from crates/integration-tests/Cargo.toml rename to tests/integration/Cargo.toml index a3d66b2..34160a3 100644 --- a/crates/integration-tests/Cargo.toml +++ b/tests/integration/Cargo.toml @@ -20,8 +20,9 @@ serde_json = { version = "1.0", default-features = false } sqlx = { version = "0.6.3", features = ["postgres", "runtime-async-std-rustls", "decimal", "chrono", "uuid"] } rust_decimal = { version = "1.36" } chrono = { version = "0.4" } +once_cell = "1.20.2" uuid = { version = "1.11", features = ["v4"] } fluvio = { workspace = true } fluvio-future = { workspace = true } -fluvio-model-sql = { path = "../fluvio-model-sql"} +fluvio-model-sql = { workspace = true } diff --git a/crates/integration-tests/configs/test_postgres_all_data_types.yaml b/tests/integration/configs/test_postgres_all_data_types.yaml similarity index 100% rename from crates/integration-tests/configs/test_postgres_all_data_types.yaml rename to tests/integration/configs/test_postgres_all_data_types.yaml diff --git a/crates/integration-tests/configs/test_postgres_consumer_offsets.yaml b/tests/integration/configs/test_postgres_consumer_offsets.yaml similarity index 100% rename from crates/integration-tests/configs/test_postgres_consumer_offsets.yaml rename to tests/integration/configs/test_postgres_consumer_offsets.yaml diff --git a/crates/integration-tests/configs/test_postgres_with_json_sql_transformations.yaml b/tests/integration/configs/test_postgres_with_json_sql_transformations.yaml similarity index 100% rename from crates/integration-tests/configs/test_postgres_with_json_sql_transformations.yaml rename to tests/integration/configs/test_postgres_with_json_sql_transformations.yaml diff --git a/tests/integration/src/all_data_types.rs b/tests/integration/src/all_data_types.rs new file mode 100644 index 0000000..ff133f2 --- /dev/null +++ b/tests/integration/src/all_data_types.rs @@ -0,0 +1,174 @@ +use std::time::Duration; + +use anyhow::{Context, Result}; +use async_std::task; +use fluvio_model_sql::{Operation, Upsert}; +use log::{debug, info}; +use serde::Deserialize; + +use crate::utils::{ + self, ctx::TestContext, generate_records, new_config_path, produce_to_fluvio, + read_from_postgres, +}; + +pub async fn test_postgres_all_data_types(ctx: &mut TestContext) -> Result<()> { + #[derive(sqlx::FromRow, Debug)] + #[allow(dead_code)] + struct TestRecord { + bool_col: bool, + smallint_col: i16, + int_col: i32, + bigint_col: i64, + float_col: f32, + double_col: f64, + text_col: String, + bytes_col: Vec, + numeric_col: rust_decimal::Decimal, + timestamp_col: chrono::NaiveDateTime, + date_col: chrono::NaiveDate, + time_col: chrono::NaiveTime, + uuid_col: uuid::Uuid, + json_col: serde_json::Value, + char_col: i8, + } + + info!("running 'test_postgres_all_data_types' test"); + let config_path = new_config_path("test_postgres_all_data_types.yaml")?; + debug!("{config_path:?}"); + let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; + let table = "test_postgres_all_data_types"; + sqlx::query(&format!( + "CREATE TABLE {} ( + bool_col bool NOT NULL, + smallint_col smallint NOT NULL, + int_col int NOT NULL, + bigint_col bigint NOT NULL, + float_col float4 NOT NULL, + double_col float8 NOT NULL, + text_col varchar NOT NULL, + bytes_col bytea NOT NULL, + numeric_col numeric NOT NULL, + timestamp_col timestamp NOT NULL, + date_col date NOT NULL, + time_col time NOT NULL, + uuid_col uuid NOT NULL PRIMARY KEY, + json_col json NOT NULL, + char_col \"char\" NOT NULL + )", + table + )) + .execute(&mut ctx.pg_conn) + .await + .context(format!("unable to create table {table})"))?; + + utils::cdk::cdk_deploy_start(&config_path, None).await?; + let connector_name = &config.meta.name; + let connector_status = utils::cdk::cdk_deploy_status(connector_name)?; + info!("connector: {connector_name}, status: {connector_status:?}"); + + let count = 10; + let records = generate_records(table, count)?; + + assert_eq!(records.len(), count); + + { + produce_to_fluvio( + &ctx.fluvio, + &config.meta.topic, + records + .iter() + .map(|op| serde_json::to_string(&Operation::Insert(op.clone()))) + .collect::>()?, + ) + .await?; + let mut received_records: Vec = read_from_postgres(table, count).await?; + + received_records.sort_by_key(|r| r.smallint_col); + + assert_eq!(received_records.len(), count); + for (i, record) in received_records.into_iter().enumerate() { + assert_eq!(record.int_col as usize, i); + assert_eq!(record.smallint_col as usize, i); + assert_eq!(record.bigint_col as usize, i); + } + } + + // first upsert should do nothing + { + let records = records + .iter() + .map(|op| { + let op = Upsert { + table: op.table.clone(), + values: op.values.clone(), + uniq_idx: "uuid_col".into(), + }; + serde_json::to_string(&Operation::Upsert(op)) + }) + .collect::>()?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + + task::sleep(Duration::from_secs(3)).await; + + let mut received_records: Vec = read_from_postgres(table, count).await?; + + received_records.sort_by_key(|r| r.smallint_col); + + assert_eq!(received_records.len(), count); + for (i, record) in received_records.into_iter().enumerate() { + assert_eq!(record.int_col as usize, i); + assert_eq!(record.smallint_col as usize, i); + assert_eq!(record.bigint_col as usize, i); + } + } + + // second upsert should do update + { + let records = records + .iter() + .enumerate() + .map(|(i, op)| { + let mut op = Upsert { + table: op.table.clone(), + values: op.values.clone(), + uniq_idx: "uuid_col".into(), + }; + op.values[2].raw_value = (i + 1).to_string(); + op.values[3].raw_value = (i + 2).to_string(); + op.values[4].raw_value = (i + 4).to_string(); + serde_json::to_string(&Operation::Upsert(op)) + }) + .collect::>()?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + + task::sleep(Duration::from_secs(3)).await; + + let mut received_records: Vec = read_from_postgres(table, count).await?; + + received_records.sort_by_key(|r| r.smallint_col); + + assert_eq!(received_records.len(), count); + for (i, record) in received_records.into_iter().enumerate() { + assert_eq!(record.smallint_col as usize, i + 1); + assert_eq!(record.int_col as usize, i + 2); + assert_eq!(record.bigint_col as usize, i + 4); + } + } + + utils::cdk::cdk_deploy_shutdown(connector_name)?; + utils::fluvio_conn::remove_topic(&ctx.fluvio, &config.meta.topic).await?; + + info!("test 'test_postgres_all_data_types' passed"); + Ok(()) +} + +#[derive(Debug, Deserialize)] +struct MetaConfig { + name: String, + topic: String, +} + +#[derive(Debug, Deserialize)] +struct TestConfig { + meta: MetaConfig, +} diff --git a/tests/integration/src/consumer_offsets.rs b/tests/integration/src/consumer_offsets.rs new file mode 100644 index 0000000..a352767 --- /dev/null +++ b/tests/integration/src/consumer_offsets.rs @@ -0,0 +1,101 @@ +use std::time::Duration; + +use anyhow::{Context, Result}; +use fluvio_future::timer::sleep; +use log::{debug, info}; +use serde::Deserialize; +use serde_json::json; + +use crate::utils::{ + self, ctx::TestContext, generate_raw_records, new_config_path, produce_to_fluvio, + read_from_postgres, +}; + +pub async fn test_postgres_consumer_offsets(ctx: &mut TestContext) -> Result<()> { + // given + info!("running 'test_postgres_consumer_offsets' test"); + let config_path = new_config_path("test_postgres_consumer_offsets.yaml")?; + debug!("{config_path:?}"); + let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; + let table = "test_postgres_consumer_offsets"; + sqlx::query(&format!( + "CREATE TABLE {} (device_id int, record json)", + table + )) + .execute(&mut (ctx.pg_conn)) + .await + .context(format!("unable to create table {table})"))?; + + utils::cdk::cdk_deploy_start(&config_path, None).await?; + let connector_name = &config.meta.name; + let connector_status = utils::cdk::cdk_deploy_status(connector_name)?; + info!("connector: {connector_name}, status: {connector_status:?}"); + + sleep(Duration::from_secs(3)).await; + let records = generate_raw_records(table, 0, 2)?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + let records = generate_raw_records(table, 2, 4)?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + info!("waiting for connector to catch up"); + sleep(Duration::from_secs(3)).await; + + // when + info!("shutting down connector"); + utils::cdk::cdk_deploy_shutdown(connector_name)?; + + info!("producing more records with connector down"); + sleep(Duration::from_secs(3)).await; + let records = generate_raw_records(table, 4, 6)?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + + info!("restarting connector"); + utils::cdk::cdk_deploy_start(&config_path, None).await?; + let records = generate_raw_records(table, 6, 8)?; + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records).await?; + sleep(Duration::from_secs(3)).await; + let connector_name = &config.meta.name; + let connector_status = utils::cdk::cdk_deploy_status(connector_name)?; + info!("connector: {connector_name}, status: {connector_status:?}"); + + let read_result = read_from_postgres(table, 8).await; + let received_records: Vec = read_result?; + utils::cdk::cdk_deploy_shutdown(connector_name)?; + utils::fluvio_conn::remove_topic(&ctx.fluvio, &config.meta.topic).await?; + + // then + assert_eq!(received_records.len(), 8); + for (i, record) in received_records.into_iter().enumerate() { + assert_eq!(record.device_id as usize, i); + assert_eq!(record.record, json!({"device": { "device_id" : i }})); + } + + let consumer = ctx + .fluvio + .consumer_offsets() + .await? + .into_iter() + .find(|c| c.consumer_id.eq("test-postgres-consumer-offsets")); + + // then + assert!(consumer.is_some()); + assert!(consumer.unwrap().offset >= 0); + info!("test 'test_postgres_consumer_offsets' passed"); + Ok(()) +} + +#[derive(Debug, Deserialize)] +struct MetaConfig { + name: String, + topic: String, +} + +#[derive(Debug, Deserialize)] +struct TestConfig { + meta: MetaConfig, +} + +#[derive(sqlx::FromRow, Debug)] +struct TestRecord { + device_id: i32, + record: serde_json::Value, +} diff --git a/tests/integration/src/json_sql_transformations.rs b/tests/integration/src/json_sql_transformations.rs new file mode 100644 index 0000000..ed40892 --- /dev/null +++ b/tests/integration/src/json_sql_transformations.rs @@ -0,0 +1,67 @@ +use anyhow::{Context, Result}; +use log::{debug, info}; +use serde::Deserialize; +use serde_json::json; + +use crate::utils::{ + self, ctx::TestContext, generate_json_records, new_config_path, produce_to_fluvio, + read_from_postgres, +}; + +pub async fn test_postgres_with_json_sql_transformations(ctx: &mut TestContext) -> Result<()> { + // given + info!("running 'test_postgres_with_json_sql_transformations' test"); + let config_path = new_config_path("test_postgres_with_json_sql_transformations.yaml")?; + debug!("{config_path:?}"); + let config: TestConfig = serde_yaml::from_reader(std::fs::File::open(&config_path)?)?; + let table = "test_postgres_with_json_sql_transformations"; + sqlx::query(&format!( + "CREATE TABLE {} (device_id int, record json)", + table + )) + .execute(&mut ctx.pg_conn) + .await + .context(format!("unable to create table {table})"))?; + + utils::cdk::cdk_deploy_start(&config_path, None).await?; + let connector_name = &config.meta.name; + let connector_status = utils::cdk::cdk_deploy_status(connector_name)?; + info!("connector: {connector_name}, status: {connector_status:?}"); + + let count = 10; + let records = generate_json_records(count); + + produce_to_fluvio(&ctx.fluvio, &config.meta.topic, records.clone()).await?; + + // when + let read_result = read_from_postgres(table, count).await; + utils::cdk::cdk_deploy_shutdown(connector_name)?; + utils::fluvio_conn::remove_topic(&ctx.fluvio, &config.meta.topic).await?; + let received_records: Vec = read_result?; + + // then + assert_eq!(received_records.len(), count); + for (i, record) in received_records.into_iter().enumerate() { + assert_eq!(record.device_id as usize, i); + assert_eq!(record.record, json!({"device": { "device_id" : i }})); + } + info!("test 'test_postgres_with_json_sql_transformations' passed"); + Ok(()) +} + +#[derive(Debug, Deserialize)] +struct MetaConfig { + name: String, + topic: String, +} + +#[derive(Debug, Deserialize)] +struct TestConfig { + meta: MetaConfig, +} + +#[derive(sqlx::FromRow, Debug)] +struct TestRecord { + device_id: i32, + record: serde_json::Value, +} diff --git a/tests/integration/src/main.rs b/tests/integration/src/main.rs new file mode 100644 index 0000000..9172946 --- /dev/null +++ b/tests/integration/src/main.rs @@ -0,0 +1,43 @@ +mod all_data_types; +mod consumer_offsets; +mod json_sql_transformations; +mod utils; + +use anyhow::{Context, Result}; +use log::{error, info}; + +#[async_std::main] +async fn main() -> Result<()> { + env_logger::init(); + + info!("preparing environment"); + let mut ctx = utils::ctx::TestContext::setup().await?; + + info!("running sql-connector integration tests"); + + if let Err(e) = run_tests(&mut ctx).await { + error!("test failed: {:#?}", e); + } + + ctx.teardown().await?; + + info!("ALL PASSED"); + + Ok(()) +} + +async fn run_tests(ctx: &mut utils::ctx::TestContext) -> Result<()> { + all_data_types::test_postgres_all_data_types(ctx) + .await + .context("test_postgres_all_data_types failed")?; + + json_sql_transformations::test_postgres_with_json_sql_transformations(ctx) + .await + .context("test_postgres_with_json_sql_transformations failed")?; + + consumer_offsets::test_postgres_consumer_offsets(ctx) + .await + .context("test_postgres_consumer_offsets failed")?; + + Ok(()) +} diff --git a/tests/integration/src/utils/cdk.rs b/tests/integration/src/utils/cdk.rs new file mode 100644 index 0000000..105f1d4 --- /dev/null +++ b/tests/integration/src/utils/cdk.rs @@ -0,0 +1,80 @@ +use std::{path::Path, process::Command, time::Duration}; + +use anyhow::Result; +use async_std::task; +use log::info; +use once_cell::sync::Lazy; + +use crate::utils::connector_dir; + +static CDK_BIN: Lazy = Lazy::new(|| std::env::var("CDK_BIN").unwrap_or("cdk".to_string())); + +pub async fn cdk_deploy_start(config_path: &Path, env: Option<(&str, &str)>) -> Result<()> { + let connector_dir = connector_dir()?; + info!( + "deploying connector with config from {config_path:?}, connector_dir: {}", + connector_dir.to_string_lossy() + ); + let mut command = Command::new(CDK_BIN.to_string()); + command.current_dir(&connector_dir); + command + .arg("deploy") + .arg("start") + .arg("--config") + .arg(config_path); + if let Some((env_name, env_value)) = env { + command.env(env_name, env_value); + } + let output = command.output()?; + if !output.status.success() { + anyhow::bail!( + "`cdk deploy start` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + task::sleep(Duration::from_secs(10)).await; // time for connector to start + Ok(()) +} + +pub fn cdk_deploy_shutdown(connector_name: &str) -> Result<()> { + info!("shutting down connector {connector_name}"); + let output = Command::new(CDK_BIN.to_string()) + .arg("deploy") + .arg("shutdown") + .arg("--name") + .arg(connector_name) + .output()?; + if !output.status.success() { + anyhow::bail!( + "`cdk deploy shutdown` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + Ok(()) +} + +pub fn cdk_deploy_status(connector_name: &str) -> Result> { + let output = Command::new(CDK_BIN.to_string()) + .arg("deploy") + .arg("list") + .output()?; + if !output.status.success() { + anyhow::bail!( + "`cdk deploy list` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + for line in String::from_utf8_lossy(output.stdout.as_slice()) + .lines() + .skip(1) + { + let mut column_iter = line.split_whitespace(); + match column_iter.next() { + Some(name) if name.eq(connector_name) => { + return Ok(column_iter.next().map(|s| s.to_owned())) + } + _ => {} + } + } + Ok(None) +} diff --git a/tests/integration/src/utils/ctx.rs b/tests/integration/src/utils/ctx.rs new file mode 100644 index 0000000..775cd27 --- /dev/null +++ b/tests/integration/src/utils/ctx.rs @@ -0,0 +1,57 @@ +use anyhow::{Context, Result}; +use bollard::Docker; +use log::info; +use sqlx::PgConnection; + +use fluvio::Fluvio; + +use super::{db, docker_conn, fluvio_conn, smdk}; + +pub struct TestContext { + pub fluvio: Fluvio, + pub pg_conn: PgConnection, + pub docker: Docker, +} + +impl TestContext { + pub async fn setup() -> Result { + let docker = docker_conn::connect_docker() + .await + .context("unable to connect to docker engine")?; + + let start_fluvio = std::env::var("START_FLUVIO") + .unwrap_or("true".to_owned()) + .parse::() + .context("unable to parse START_FLUVIO env")?; + + if start_fluvio { + fluvio_conn::start_cluster() + .await + .context("unable to start fluvio cluster")?; + } + + smdk::load("json-sql").await?; + + let fluvio = fluvio_conn::connect_fluvio() + .await + .context("unable to connect to fluvio cluster")?; + + let pg_conn = db::run_postgres(&docker) + .await + .context("unable to run posgres container")?; + + Ok(Self { + fluvio, + pg_conn, + docker, + }) + } + + pub async fn teardown(&mut self) -> Result<()> { + info!("tearing down environment"); + db::remove_postgres(&self.docker).await?; + fluvio_conn::delete_cluster().await?; + info!("environment teardown finished"); + Ok(()) + } +} diff --git a/tests/integration/src/utils/db.rs b/tests/integration/src/utils/db.rs new file mode 100644 index 0000000..ffbe0ca --- /dev/null +++ b/tests/integration/src/utils/db.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; + +use anyhow::Result; +use bollard::{ + container::{Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions}, + image::CreateImageOptions, + service::{HostConfig, PortBinding}, + Docker, +}; +use futures_util::stream::TryStreamExt; +use log::{debug, info}; +use sqlx::{Connection, PgConnection}; + +use fluvio_future::retry::{retry, ExponentialBackoff}; + +const POSTGRES_IMAGE: &str = "postgres:15.2"; +const POSTGRES_HOST_PORT: &str = "5432"; +const POSTGRES_PASSWORD: &str = "passpass"; +const POSTGRES_USER: &str = "pguser"; +const POSTGRES_DB: &str = POSTGRES_USER; + +pub async fn run_postgres(docker: &Docker) -> Result { + info!("starting postgres container"); + + let config: Config = Config { + image: Some(POSTGRES_IMAGE.to_owned()), + exposed_ports: Some(HashMap::from([( + POSTGRES_HOST_PORT.to_owned(), + Default::default(), + )])), + host_config: Some(HostConfig { + port_bindings: Some(HashMap::from([( + POSTGRES_HOST_PORT.to_owned(), + Some(vec![PortBinding { + host_ip: Some("0.0.0.0".to_owned()), + host_port: Some(POSTGRES_HOST_PORT.to_owned()), + }]), + )])), + ..Default::default() + }), + env: Some(vec![ + format!("POSTGRES_PASSWORD={POSTGRES_PASSWORD}"), + format!("POSTGRES_USER={POSTGRES_USER}"), + ]), + ..Default::default() + }; + let _ = &docker + .create_image( + Some(CreateImageOptions { + from_image: POSTGRES_IMAGE, + ..Default::default() + }), + None, + None, + ) + .try_collect::>() + .await?; + + let _ = &docker + .create_container( + Some(CreateContainerOptions { + name: "postgres", + platform: None, + }), + config, + ) + .await?; + + let _ = &docker + .start_container("postgres", None::>) + .await?; + + info!("postgres container created, waiting for readiness"); + let conn = retry(ExponentialBackoff::from_millis(10).take(6), || { + connect_postgres() + }) + .await?; + + info!("postgres container started with {POSTGRES_IMAGE} image"); + + Ok(conn) +} + +pub async fn connect_postgres() -> Result { + let connection_str = postgres_connection_str(); + debug!("connecting to {connection_str}"); + let mut conn = PgConnection::connect(&connection_str).await?; + + sqlx::query("SELECT count(*) FROM pg_catalog.pg_tables") + .fetch_one(&mut conn) + .await?; + Ok(conn) +} + +fn postgres_connection_str() -> String { + format!( + "postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{POSTGRES_HOST_PORT}/{POSTGRES_DB}" + ) +} +pub async fn remove_postgres(docker: &Docker) -> Result<()> { + let _ = &docker + .remove_container( + "postgres", + Some(RemoveContainerOptions { + v: true, + force: true, + ..Default::default() + }), + ) + .await?; + info!("postgres container removed"); + Ok(()) +} diff --git a/tests/integration/src/utils/docker_conn.rs b/tests/integration/src/utils/docker_conn.rs new file mode 100644 index 0000000..6d85463 --- /dev/null +++ b/tests/integration/src/utils/docker_conn.rs @@ -0,0 +1,15 @@ +use anyhow::Result; +use bollard::Docker; +use log::info; + +pub async fn connect_docker() -> Result { + info!("checking docker engine availability"); + + let docker = Docker::connect_with_local_defaults()?; + let version = docker.version().await?; + info!( + "connected to docker version: {:?}, api_version: {:?}", + &version.version, &version.api_version + ); + Ok(docker) +} diff --git a/tests/integration/src/utils/fluvio_conn.rs b/tests/integration/src/utils/fluvio_conn.rs new file mode 100644 index 0000000..881045f --- /dev/null +++ b/tests/integration/src/utils/fluvio_conn.rs @@ -0,0 +1,59 @@ +use std::process::Command; + +use anyhow::Result; +use log::info; +use once_cell::sync::Lazy; + +use fluvio::{metadata::topic::TopicSpec, Fluvio}; + +static FLUVIO_BIN: Lazy = + Lazy::new(|| std::env::var("FLUVIO_BIN").unwrap_or("fluvio".to_string())); + +pub async fn connect_fluvio() -> Result { + info!("checking fluvio cluster availability"); + let fluvio = fluvio::Fluvio::connect().await?; + info!("connected to fluvio version: {}", fluvio.platform_version()); + Ok(fluvio) +} + +pub async fn remove_topic(fluvio: &Fluvio, topic: &str) -> Result<()> { + info!("removing topic: {}", topic); + fluvio.admin().await.delete::(topic).await?; + info!("topic removed: {}", topic); + Ok(()) +} + +pub async fn start_cluster() -> Result<()> { + info!("starting fluvio cluster"); + let output = Command::new(FLUVIO_BIN.to_string()) + .arg("cluster") + .arg("start") + .arg("--local") + .output()?; + if !output.status.success() { + anyhow::bail!( + "`fluvio cluster start` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + info!("fluvio cluster started"); + Ok(()) +} + +pub async fn delete_cluster() -> Result<()> { + info!("deleting fluvio cluster"); + + let output = Command::new(FLUVIO_BIN.to_string()) + .arg("cluster") + .arg("delete") + .arg("--force") + .output()?; + if !output.status.success() { + anyhow::bail!( + "`fluvio cluster delete` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + info!("fluvio cluster deleted"); + Ok(()) +} diff --git a/tests/integration/src/utils/mod.rs b/tests/integration/src/utils/mod.rs new file mode 100644 index 0000000..025de92 --- /dev/null +++ b/tests/integration/src/utils/mod.rs @@ -0,0 +1,204 @@ +pub mod cdk; +pub mod ctx; +pub mod db; +pub mod docker_conn; +pub mod fluvio_conn; +pub mod smdk; + +use std::path::PathBuf; + +use anyhow::Result; +use sqlx::{postgres::PgRow, FromRow}; + +use fluvio::{Fluvio, RecordKey}; +use fluvio_future::retry::{retry, ExponentialBackoff}; +use fluvio_model_sql::{Insert, Operation, Type, Value}; + +pub fn new_config_path(name: &str) -> Result { + let package_dir = std::env::var("CARGO_MANIFEST_DIR")?; + println!("package_dir: {}", package_dir); + let mut path = PathBuf::new(); + path.push(package_dir); + path.push("configs"); + path.push(name); + Ok(path) +} + +fn connector_dir() -> Result { + let package_dir = std::env::var("CARGO_MANIFEST_DIR")?; + let mut path = PathBuf::new(); + path.push(package_dir); + path.push(".."); + path.push(".."); + path.push("crates"); + path.push("sql-sink"); + Ok(path.canonicalize()?) +} + +fn sm_dir() -> Result { + let package_dir = std::env::var("CARGO_MANIFEST_DIR")?; + let mut path = PathBuf::new(); + path.push(package_dir); + path.push(".."); + path.push(".."); + path.push("crates"); + path.push("json-sql"); + Ok(path.canonicalize()?) +} + +pub fn generate_records(table: &str, count: usize) -> Result> { + let mut result = Vec::with_capacity(count); + for i in 0..count { + let op = Insert { + table: table.to_string(), + values: vec![ + Value { + column: "json_col".to_string(), + raw_value: "{\"json_key\":\"json_value\"}".to_string(), + type_: Type::Json, + }, + Value { + column: "bool_col".to_string(), + raw_value: "true".to_string(), + type_: Type::Bool, + }, + Value { + column: "smallint_col".to_string(), + raw_value: i.to_string(), + type_: Type::SmallInt, + }, + Value { + column: "int_col".to_string(), + raw_value: i.to_string(), + type_: Type::Int, + }, + Value { + column: "bigint_col".to_string(), + raw_value: i.to_string(), + type_: Type::BigInt, + }, + Value { + column: "text_col".to_string(), + raw_value: "some text".to_string(), + type_: Type::Text, + }, + Value { + column: "bytes_col".to_string(), + raw_value: "some bytes".to_string(), + type_: Type::Bytes, + }, + Value { + column: "float_col".to_string(), + raw_value: "3.123".to_string(), + type_: Type::Float, + }, + Value { + column: "double_col".to_string(), + raw_value: "3.333333333".to_string(), + type_: Type::DoublePrecision, + }, + Value { + column: "numeric_col".to_string(), + raw_value: rust_decimal::Decimal::TEN.to_string(), + type_: Type::Numeric, + }, + Value { + column: "timestamp_col".to_string(), + raw_value: chrono::Utc::now().naive_local().to_string(), + type_: Type::Timestamp, + }, + Value { + column: "date_col".to_string(), + raw_value: chrono::Utc::now().naive_local().date().to_string(), + type_: Type::Date, + }, + Value { + column: "time_col".to_string(), + raw_value: chrono::Utc::now() + .naive_local() + .time() + .format("%H:%M:%S") + .to_string(), + type_: Type::Time, + }, + Value { + column: "uuid_col".to_string(), + raw_value: uuid::Uuid::new_v4().to_string(), + type_: Type::Uuid, + }, + Value { + column: "char_col".to_string(), + raw_value: "126".to_string(), + type_: Type::Char, + }, + ], + }; + result.push(op); + } + Ok(result) +} + +pub fn generate_raw_records(table: &str, start: usize, end: usize) -> Result> { + let mut result = Vec::with_capacity(end); + for i in start..end { + let op = Insert { + table: table.to_string(), + values: vec![ + Value { + column: "device_id".to_string(), + raw_value: i.to_string(), + type_: Type::Int, + }, + Value { + column: "record".to_string(), + raw_value: format!("{{\"device\":{{\"device_id\":{i}}}}}"), + type_: Type::Json, + }, + ], + }; + result.push(op); + } + Ok(result + .iter() + .map(|op| serde_json::to_string(&Operation::Insert(op.clone()))) + .collect::>()?) +} + +pub fn generate_json_records(count: usize) -> Vec { + (0..count) + .map(|i| format!("{{\"device\":{{\"device_id\":{i}}}}}")) + .collect() +} + +pub async fn produce_to_fluvio> + std::fmt::Debug + Send + Sync + 'static>( + fluvio: &Fluvio, + fluvio_topic: &str, + records: Vec, +) -> Result<()> { + let producer = fluvio.topic_producer(fluvio_topic).await?; + for record in records { + producer.send(RecordKey::NULL, record).await?; + } + producer.flush().await?; + Ok(()) +} + +pub async fn read_from_postgres(table: &str, count: usize) -> Result> +where + R: for<'r> FromRow<'r, PgRow> + Unpin + Send, +{ + let result = retry(ExponentialBackoff::from_millis(10).take(5), || { + let sql = format!("SELECT * FROM {table} LIMIT {count}"); + async move { + let mut pg_conn = db::connect_postgres().await?; + let result: Vec = sqlx::query_as(&sql).fetch_all(&mut pg_conn).await?; + if result.len() != count { + anyhow::bail!("not all expected records are ready yet"); + } + Ok::, anyhow::Error>(result) + } + }) + .await?; + + Ok(result) +} diff --git a/tests/integration/src/utils/smdk.rs b/tests/integration/src/utils/smdk.rs new file mode 100644 index 0000000..a88236a --- /dev/null +++ b/tests/integration/src/utils/smdk.rs @@ -0,0 +1,29 @@ +use std::{process::Command, time::Duration}; + +use anyhow::Result; +use async_std::task; +use log::info; +use once_cell::sync::Lazy; + +use crate::utils::sm_dir; + +static SMDK_BIN: Lazy = + Lazy::new(|| std::env::var("SMDK_BIN").unwrap_or("smdk".to_string())); + +pub async fn load(smdk_name: &str) -> Result<()> { + let connector_dir = sm_dir()?; + info!("loading smart module {smdk_name}"); + let mut command = Command::new(SMDK_BIN.to_string()); + command.current_dir(&connector_dir); + command.arg("load").arg("-p").arg(smdk_name); + let output = command.output()?; + if !output.status.success() { + anyhow::bail!( + "`smdk load` failed with:\n {}", + String::from_utf8_lossy(output.stderr.as_slice()) + ) + } + task::sleep(Duration::from_secs(5)).await; + info!("smart module loaded"); + Ok(()) +}