diff --git a/.dockerignore b/.dockerignore index 984013bc97e2a..aa0493b541e49 100644 --- a/.dockerignore +++ b/.dockerignore @@ -48,3 +48,6 @@ risedev-components.user.env riselab-components.user.env .git/ + +Dockerfile +.dockerignore diff --git a/Cargo.lock b/Cargo.lock index 6a501a1dc0360..c597d3b778d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3682,13 +3682,14 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd#85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" +source = "git+https://github.com/icelake-io/icelake?rev=166a36b1a40a64086db09a0e0f2ed6791cec548b#166a36b1a40a64086db09a0e0f2ed6791cec548b" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "arrow-arith", "arrow-array", "arrow-buffer", + "arrow-cast", "arrow-row", "arrow-schema", "arrow-select", @@ -9323,8 +9324,8 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.8" -source = "git+https://github.com/madsim-rs/rust-postgres.git?rev=4538cd6#4538cd6bacd66909a56a8aa3aa3fd7b0b52545b3" +version = "0.7.10" +source = "git+https://github.com/madsim-rs/rust-postgres.git?rev=ac00d88#ac00d8866b8abeede7747587956ef11766b5902f" dependencies = [ "async-trait", "byteorder", @@ -9340,8 +9341,10 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", + "rand", "socket2 0.5.3", "tokio-util", + "whoami", ] [[package]] @@ -9387,9 +9390,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -10042,6 +10045,10 @@ name = "whoami" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] [[package]] name = "winapi" @@ -10363,6 +10370,7 @@ dependencies = [ "unicode-normalization", "url", "uuid", + "whoami", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ccca32af0b9e8..1898dafcbb3bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.3.1" } tonic-build = { package = "madsim-tonic-build", version = "0.3.1" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "166a36b1a40a64086db09a0e0f2ed6791cec548b" } arrow-array = "46" arrow-schema = "46" arrow-buffer = "46" @@ -223,6 +223,6 @@ quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "8daf97e" } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" } tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } -tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "4538cd6" } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" } # patch: unlimit 4MB message size for grpc client etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "d55550a" } diff --git a/ci/scripts/docker.sh b/ci/scripts/docker.sh index d84cbc39016dc..f7936f9987d70 100755 --- a/ci/scripts/docker.sh +++ b/ci/scripts/docker.sh @@ -7,9 +7,26 @@ ghcraddr="ghcr.io/risingwavelabs/risingwave" dockerhubaddr="risingwavelabs/risingwave" arch="$(uname -m)" +echo "--- ghcr login" +echo "$GHCR_TOKEN" | docker login ghcr.io -u "$GHCR_USERNAME" --password-stdin + +echo "--- dockerhub login" +echo "$DOCKER_TOKEN" | docker login -u "risingwavelabs" --password-stdin + # Build RisingWave docker image ${BUILDKITE_COMMIT}-${arch} echo "--- docker build and tag" -docker build -f docker/Dockerfile --build-arg "GIT_SHA=${BUILDKITE_COMMIT}" -t "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" --target risingwave . +docker buildx create \ + --name container \ + --driver=docker-container + +docker buildx build -f docker/Dockerfile \ + --build-arg "GIT_SHA=${BUILDKITE_COMMIT}" -t "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" \ + --progress plain \ + --builder=container \ + --load \ + --cache-to "type=registry,ref=ghcr.io/risingwavelabs/risingwave-build-cache:${arch}" \ + --cache-from "type=registry,ref=ghcr.io/risingwavelabs/risingwave-build-cache:${arch}" \ + . echo "--- check the image can start correctly" container_id=$(docker run -d "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" playground) @@ -25,12 +42,6 @@ fi echo "--- docker images" docker images -echo "--- ghcr login" -echo "$GHCR_TOKEN" | docker login ghcr.io -u "$GHCR_USERNAME" --password-stdin - -echo "--- dockerhub login" -echo "$DOCKER_TOKEN" | docker login -u "risingwavelabs" --password-stdin - echo "--- docker push to ghcr" docker push "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index fc0fb5e648865..a834bf92687f9 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -65,7 +65,11 @@ echo "--- e2e, $mode, batch" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cluster_start sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}" -sqllogictest -p 4566 -d dev './e2e_test/background_ddl/**/*.slt' --junit "batch-ddl-${profile}" +if [[ $mode != "standalone" ]]; then + sqllogictest -p 4566 -d dev './e2e_test/background_ddl/**/*.slt' --junit "batch-ddl-${profile}" +else + echo "Skipping background_ddl test for $mode" +fi sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}" sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' diff --git a/docker/Dockerfile b/docker/Dockerfile index 547d7aed477bf..cb86657957b77 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -17,32 +17,56 @@ COPY ./proto /risingwave/proto RUN cd /risingwave/dashboard && npm i && npm run build-static && rm -rf node_modules -FROM base AS builder +FROM base AS rust-base -RUN apt-get update && apt-get -y install make cmake protobuf-compiler curl bash lld maven unzip +RUN apt-get -y install make cmake protobuf-compiler curl bash lld unzip SHELL ["/bin/bash", "-c"] RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y +ENV PATH /root/.cargo/bin/:$PATH +ENV CARGO_INCREMENTAL=0 -RUN mkdir -p /risingwave +COPY rust-toolchain rust-toolchain -WORKDIR /risingwave +# We need to add the `rustfmt` dependency, otherwise `risingwave_pb` will not compile +RUN rustup self update \ + && rustup set profile minimal \ + && rustup show \ + && rustup component add rustfmt -COPY ./ /risingwave -ENV PATH /root/.cargo/bin/:$PATH +# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often. +# We may consider sccache instead. -ENV IN_CONTAINER=1 +# RUN cargo install --git https://github.com/xxchan/cargo-chef cargo-chef --locked --rev 11f9fed +# FROM rust-base AS rust-planner + +# RUN mkdir -p /risingwave +# WORKDIR /risingwave +# COPY ./ /risingwave + +# RUN cargo chef prepare --recipe-path recipe.json + +# FROM rust-base AS rust-builder + +# RUN mkdir -p /risingwave +# WORKDIR /risingwave + +# COPY --from=rust-planner /risingwave/recipe.json recipe.json + +# # Build dependencies - this can be cached if the dependencies don't change +# RUN cargo chef cook --release --recipe-path recipe.json + +FROM rust-base AS rust-builder + +# Build application ARG GIT_SHA ENV GIT_SHA=$GIT_SHA -# We need to add the `rustfmt` dependency, otherwise `risingwave_pb` will not compile -RUN rustup self update \ - && rustup set profile minimal \ - && rustup show \ - && rustup component add rustfmt +COPY ./ /risingwave +WORKDIR /risingwave RUN cargo fetch && \ cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \ @@ -53,9 +77,39 @@ RUN cargo fetch && \ chmod +x /risingwave/bin/jeprof && \ mkdir -p /risingwave/lib && cargo clean -RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.release=true && \ - mkdir -p /risingwave/bin/connector-node && \ - tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node +FROM base AS java-planner + +RUN mkdir -p /risingwave +WORKDIR /risingwave + +COPY java /risingwave/java/ + +# Move java/**/pom.xml to poms/**/pom.xml +RUN find . -name pom.xml -exec bash -c 'mkdir -p poms/$(dirname {}); mv {} poms/{}' \; + +# We use rust-maven-plugin to build java-binding. So it's FROM rust-base +FROM rust-base AS java-builder + +RUN apt-get -y install maven + +RUN mkdir -p /risingwave +WORKDIR /risingwave/java + +# 1. copy only poms +COPY --from=java-planner /risingwave/poms /risingwave/java/ + +# 2. start downloading dependencies +RUN mvn dependency:go-offline --fail-never + +# 3. add all source code and start compiling +# TODO: only add java related code so that changing rust code won't recompile java code +# Currently java-binding depends on the workspace Cargo.toml, which depends on the whole rust codebase +# Besides, rust-maven-plugin sets --target-dir, so the dependencies are built twice. How to dedup? +COPY ./ /risingwave + +RUN mvn -B package -Dmaven.test.skip=true -Djava.binding.release=true && \ + mkdir -p /risingwave/bin/connector-node && \ + tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node FROM base AS risingwave @@ -66,11 +120,11 @@ RUN apt-get -y install gdb \ RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib -COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave -COPY --from=builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwave.dwp -COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node +COPY --from=rust-builder /risingwave/bin/risingwave /risingwave/bin/risingwave +COPY --from=rust-builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwave.dwp +COPY --from=java-builder /risingwave/bin/connector-node /risingwave/bin/connector-node COPY --from=dashboard-builder /risingwave/dashboard/out /risingwave/ui -COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof +COPY --from=rust-builder /risingwave/bin/jeprof /usr/local/bin/jeprof # Set default playground mode to docker-playground profile ENV PLAYGROUND_PROFILE docker-playground @@ -78,6 +132,7 @@ ENV PLAYGROUND_PROFILE docker-playground ENV RW_DASHBOARD_UI_PATH /risingwave/ui # Set default connector libs path ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs +ENV IN_CONTAINER=1 ENTRYPOINT [ "/risingwave/bin/risingwave" ] CMD [ "playground" ] diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt index 6a8141697d006..000236ca36a6e 100644 --- a/e2e_test/background_ddl/basic.slt +++ b/e2e_test/background_ddl/basic.slt @@ -8,7 +8,7 @@ statement ok CREATE TABLE t (v1 int); statement ok -INSERT INTO t select * from generate_series(1, 500000); +INSERT INTO t select * from generate_series(1, 300000); statement ok FLUSH; @@ -22,12 +22,11 @@ CREATE MATERIALIZED VIEW m2 as SELECT * FROM t; statement ok CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; -sleep 3s - -query I -select count(*) from rw_catalog.rw_ddl_progress; ----- -3 +# Disable the flaky check: +# query I +# select count(*) from rw_catalog.rw_ddl_progress; +# ---- +# 3 statement error SELECT * FROM m1; @@ -42,17 +41,17 @@ sleep 30s query I select count(*) from m1; ---- -500000 +300000 query I select count(*) from m2; ---- -500000 +300000 query I select count(*) from m3; ---- -500000 +300000 statement ok DROP MATERIALIZED VIEW m1; diff --git a/e2e_test/batch/catalog/metabase.slt.part b/e2e_test/batch/catalog/metabase.slt.part new file mode 100644 index 0000000000000..0172930f6e5f2 --- /dev/null +++ b/e2e_test/batch/catalog/metabase.slt.part @@ -0,0 +1,24 @@ +query +SELECT + NULL AS TABLE_CAT, + n.nspname AS TABLE_SCHEM, + ct.relname AS TABLE_NAME, + a.attname AS COLUMN_NAME, + ( + information_schema._pg_expandarray(i.indkey) + ).n AS KEY_SEQ, + ci.relname AS PK_NAME, + information_schema._pg_expandarray(i.indkey) AS KEYS, + a.attnum AS A_ATTNUM +FROM + pg_catalog.pg_class ct + JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) + JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) + JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid) + JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) +WHERE + true + AND n.nspname = 'public' + AND ct.relname = 'sentences' + AND i.indisprimary +---- diff --git a/e2e_test/ddl/search_path.slt b/e2e_test/ddl/search_path.slt index bbb927bdd8976..06db7f3f45c90 100644 --- a/e2e_test/ddl/search_path.slt +++ b/e2e_test/ddl/search_path.slt @@ -101,5 +101,44 @@ drop schema search_path_test1; statement ok drop schema search_path_test2; +# Schema for functions https://github.com/risingwavelabs/risingwave/issues/12422 + +query TI +select * from information_schema._pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + +# FIXME: This should not be available since information_schema is not in the search path +query TI +select * from _pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + + +statement ok +set search_path to information_schema; + +query TI +select * from _pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + +# built-in functions (pg_catalog) are always available +query I +select abs(1) +---- +1 + +query I +select pg_catalog.abs(1) +---- +1 + statement ok set search_path to "$user", public; diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/iceberg_sink_v2.slt index 126ed4bbf4f4b..5f847eaa30a7e 100644 --- a/e2e_test/iceberg/iceberg_sink_v2.slt +++ b/e2e_test/iceberg/iceberg_sink_v2.slt @@ -25,12 +25,12 @@ CREATE SINK s6 AS select * from mv6 WITH ( force_append_only = 'true', database.name = 'demo', table.name = 'demo_db.demo_table', - catalog.type = 'storage', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', - s3.access.key = 'admin', - s3.secret.key = 'password' + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' ); statement ok diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 7c1727f4a82f3..178c76edf3e40 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -14,6 +14,10 @@ package com.risingwave.connector; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.connector.api.sink.SinkWriterBase; @@ -183,17 +187,35 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) * * @param row * @return Map from Field name to Value + * @throws JsonProcessingException + * @throws JsonMappingException */ - private Map buildDoc(SinkRow row) { + private Map buildDoc(SinkRow row) + throws JsonMappingException, JsonProcessingException { Map doc = new HashMap(); - for (int i = 0; i < getTableSchema().getNumColumns(); i++) { + var tableSchema = getTableSchema(); + var columnDescs = tableSchema.getColumnDescs(); + for (int i = 0; i < row.size(); i++) { + var type = columnDescs.get(i).getDataType().getTypeName(); Object col = row.get(i); - if (col instanceof Date) { - // es client doesn't natively support java.sql.Timestamp/Time/Date - // so we need to convert Date type into a string as suggested in - // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 - col = col.toString(); + switch (type) { + case DATE: + // es client doesn't natively support java.sql.Timestamp/Time/Date + // so we need to convert Date type into a string as suggested in + // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 + col = col.toString(); + break; + case JSONB: + ObjectMapper mapper = new ObjectMapper(); + col = + mapper.readValue( + (String) col, new TypeReference>() {}); + break; + default: + break; } + if (col instanceof Date) {} + doc.put(getTableSchema().getColumnDesc(i).getName(), col); } return doc; @@ -219,7 +241,7 @@ private String buildId(SinkRow row) { return id; } - private void processUpsert(SinkRow row) { + private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { Map doc = buildDoc(row); final String key = buildId(row); @@ -234,7 +256,7 @@ private void processDelete(SinkRow row) { bulkProcessor.add(deleteRequest); } - private void writeRow(SinkRow row) { + private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { switch (row.getOp()) { case INSERT: case UPDATE_INSERT: diff --git a/proto/buf.yaml b/proto/buf.yaml index a2870930bb4ac..1aa31816ce0af 100644 --- a/proto/buf.yaml +++ b/proto/buf.yaml @@ -9,6 +9,7 @@ lint: # This proto is copied from https://github.com/grpc/grpc/blob/v1.15.0/doc/health-checking.md # It violates some lint rules, so we ignore it. - health.proto + allow_comment_ignores: true enum_zero_value_suffix: UNSPECIFIED except: - ENUM_VALUE_PREFIX # Enum variant doesn't have to prefix with enum name. diff --git a/proto/expr.proto b/proto/expr.proto index 7d24241850918..0c7290705f826 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -247,6 +247,8 @@ message TableFunction { REGEXP_MATCHES = 3; RANGE = 4; GENERATE_SUBSCRIPTS = 5; + // buf:lint:ignore ENUM_VALUE_UPPER_SNAKE_CASE + _PG_EXPANDARRAY = 6; // Jsonb functions JSONB_ARRAY_ELEMENTS = 10; JSONB_ARRAY_ELEMENTS_TEXT = 11; diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index 32f8a8b73c61e..1b76bb6e6e997 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -196,14 +196,19 @@ impl GroupTopNExecutor { #[for_await] for chunk in self.child.execute() { - let chunk = Arc::new(chunk?.compact()); + let chunk = Arc::new(chunk?); let keys = K::build(self.group_key.as_slice(), &chunk)?; - for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)? - .into_iter() - .zip_eq_fast(keys.into_iter()) - .enumerate() + for (row_id, ((encoded_row, key), visible)) in + encode_chunk(&chunk, &self.column_orders)? + .into_iter() + .zip_eq_fast(keys.into_iter()) + .zip_eq_fast(chunk.visibility().iter()) + .enumerate() { + if !visible { + continue; + } let heap = groups.entry(key).or_insert_with(|| { TopNHeap::new( self.limit, diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 283a4caf4f80c..d4480d9bd4064 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -223,10 +223,17 @@ impl HashAggExecutor { // consume all chunks to compute the agg result #[for_await] for chunk in self.child.execute() { - let chunk = StreamChunk::from(chunk?.compact()); + let chunk = StreamChunk::from(chunk?); let keys = K::build(self.group_key_columns.as_slice(), &chunk)?; let mut memory_usage_diff = 0; - for (row_id, key) in keys.into_iter().enumerate() { + for (row_id, (key, visible)) in keys + .into_iter() + .zip_eq_fast(chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } let mut new_group = false; let states = groups.entry(key).or_insert_with(|| { new_group = true; diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 633f9243f0eeb..0f5a0788b23ec 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -231,7 +231,7 @@ impl HashJoinExecutor { let mut build_row_count = 0; #[for_await] for build_chunk in self.build_side_source.execute() { - let build_chunk = build_chunk?.compact(); + let build_chunk = build_chunk?; if build_chunk.cardinality() > 0 { build_row_count += build_chunk.cardinality(); self.mem_ctx.add(build_chunk.estimated_heap_size() as i64); @@ -252,8 +252,15 @@ impl HashJoinExecutor { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build(&self.build_key_idxs, build_chunk)?; - for (build_row_id, build_key) in build_keys.into_iter().enumerate() { + for (build_row_id, (build_key, visible)) in build_keys + .into_iter() + .zip_eq_fast(build_chunk.visibility().iter()) + .enumerate() + { self.shutdown_rx.check()?; + if !visible { + continue; + } // Only insert key to hash map if it is consistent with the null safe restriction. if build_key.null_bitmap().is_subset(&null_matched) { let row_id = RowId::new(build_chunk_id, build_row_id); @@ -348,7 +355,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -404,7 +418,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -466,7 +487,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } non_equi_state.found_matched = false; non_equi_state .first_output_row_id @@ -537,7 +565,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } shutdown_rx.check()?; if !ANTI_JOIN { if hash_map.get(probe_key).is_some() { @@ -594,7 +629,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } non_equi_state .first_output_row_id .push(chunk_builder.buffered_count()); @@ -662,7 +704,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state @@ -737,7 +786,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -795,7 +851,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -860,7 +923,13 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for probe_key in &probe_keys { + for (probe_key, visible) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + { + if !visible { + continue; + } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -908,7 +977,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -974,7 +1050,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -1049,7 +1132,14 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build(&probe_key_idxs, &probe_chunk)?; - for (probe_row_id, probe_key) in probe_keys.iter().enumerate() { + for (probe_row_id, (probe_key, visible)) in probe_keys + .iter() + .zip_eq_fast(probe_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } left_non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { left_non_equi_state diff --git a/src/batch/src/executor/join/lookup_join_base.rs b/src/batch/src/executor/join/lookup_join_base.rs index d60c399ef9941..3e754ab9f012f 100644 --- a/src/batch/src/executor/join/lookup_join_base.rs +++ b/src/batch/src/executor/join/lookup_join_base.rs @@ -25,6 +25,7 @@ use risingwave_common::memory::MemoryContext; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_expr::expr::BoxedExpression; @@ -128,7 +129,7 @@ impl LookupJoinBase { let mut build_row_count = 0; #[for_await] for build_chunk in hash_join_build_side_input.execute() { - let build_chunk = build_chunk?.compact(); + let build_chunk = build_chunk?; if build_chunk.cardinality() > 0 { build_row_count += build_chunk.cardinality(); self.mem_ctx.add(build_chunk.estimated_heap_size() as i64); @@ -147,7 +148,14 @@ impl LookupJoinBase { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build(&hash_join_build_side_key_idxs, build_chunk)?; - for (build_row_id, build_key) in build_keys.into_iter().enumerate() { + for (build_row_id, (build_key, visible)) in build_keys + .into_iter() + .zip_eq_fast(build_chunk.visibility().iter()) + .enumerate() + { + if !visible { + continue; + } // Only insert key to hash map if it is consistent with the null safe // restriction. if build_key.null_bitmap().is_subset(&null_matched) { diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 0f89e6b4f53f4..00d418af0010f 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -26,12 +26,14 @@ use crate::types::{Int256, StructType}; use crate::util::iter_util::ZipEqDebug; // Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`. - -// note: DataChunk -> arrow RecordBatch will IGNORE the visibilities. impl TryFrom<&DataChunk> for arrow_array::RecordBatch { type Error = ArrayError; fn try_from(chunk: &DataChunk) -> Result { + if !chunk.is_compacted() { + let c = chunk.clone(); + return Self::try_from(&c.compact()); + } let columns: Vec<_> = chunk .columns() .iter() diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index d072c65ad1826..897bc95ec9992 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -189,6 +189,9 @@ impl StreamChunk { } pub fn to_protobuf(&self) -> PbStreamChunk { + if !self.is_compacted() { + return self.clone().compact().to_protobuf(); + } PbStreamChunk { cardinality: self.cardinality() as u32, ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(), diff --git a/src/common/src/array/stream_chunk_iter.rs b/src/common/src/array/stream_chunk_iter.rs index 1e58fe825ffc2..82a8e9997b661 100644 --- a/src/common/src/array/stream_chunk_iter.rs +++ b/src/common/src/array/stream_chunk_iter.rs @@ -52,6 +52,18 @@ impl StreamChunk { let (row, visible) = self.data_chunk().row_at(pos); (op, row, visible) } + + pub fn rows_with_holes(&self) -> impl Iterator)>> { + self.data_chunk().rows_with_holes().map(|row| { + row.map(|row| { + ( + // SAFETY: index is checked since we are in the iterator. + unsafe { *self.ops().get_unchecked(row.index()) }, + row, + ) + }) + }) + } } pub struct StreamChunkRefIter<'a> { diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 187b87397dbf4..841b7fd0c6340 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -301,6 +301,10 @@ impl ClickHouseSinkWriter { )?; for (op, row) in chunk.rows() { if op != Op::Insert { + tracing::warn!( + "append only click house sink receive an {:?} which will be ignored.", + op + ); continue; } let mut clickhouse_filed_vec = vec![]; diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 0238c66d43788..db5b899c687c9 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -33,6 +33,7 @@ use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; use serde_json::Value; +use url::Url; use super::{ Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, @@ -73,7 +74,7 @@ pub struct IcebergConfig { pub catalog_type: Option, #[serde(rename = "warehouse.path")] - pub path: Option, // Path of iceberg warehouse, only applicable in storage catalog. + pub path: String, // Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "catalog.uri")] pub uri: Option, // URI of iceberg catalog, only applicable in rest catalog. @@ -108,57 +109,81 @@ impl IcebergConfig { Ok(config) } - fn build_iceberg_configs(&self) -> HashMap { + fn build_iceberg_configs(&self) -> Result> { let mut iceberg_configs = HashMap::new(); - iceberg_configs.insert( - CATALOG_TYPE.to_string(), - self.catalog_type - .as_deref() - .unwrap_or("storage") - .to_string(), - ); + + let catalog_type = self + .catalog_type + .as_deref() + .unwrap_or("storage") + .to_string(); + + iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); iceberg_configs.insert( CATALOG_NAME.to_string(), self.database_name.clone().to_string(), ); - if let Some(path) = &self.path { - iceberg_configs.insert( - format!("iceberg.catalog.{}.warehouse", self.database_name), - path.clone().to_string(), - ); - } - if let Some(uri) = &self.uri { - iceberg_configs.insert( - format!("iceberg.catalog.{}.uri", self.database_name), - uri.clone().to_string(), - ); + match catalog_type.as_str() { + "storage" => { + iceberg_configs.insert( + format!("iceberg.catalog.{}.warehouse", self.database_name), + self.path.clone(), + ); + } + "rest" => { + let uri = self.uri.clone().ok_or_else(|| { + SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) + })?; + iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.database_name), uri); + } + _ => { + return Err(SinkError::Iceberg(anyhow!( + "Unsupported catalog type: {}, only support `storage` and `rest`", + catalog_type + ))) + } } if let Some(region) = &self.region { iceberg_configs.insert( - "iceberg.catalog.table.io.region".to_string(), + "iceberg.table.io.region".to_string(), region.clone().to_string(), ); } if let Some(endpoint) = &self.endpoint { iceberg_configs.insert( - "iceberg.catalog.table.io.endpoint".to_string(), + "iceberg.table.io.endpoint".to_string(), endpoint.clone().to_string(), ); } iceberg_configs.insert( - "iceberg.catalog.table.io.access_key_id".to_string(), + "iceberg.table.io.access_key_id".to_string(), self.access_key.clone().to_string(), ); iceberg_configs.insert( - "iceberg.catalog.table.io.secret_access_key".to_string(), + "iceberg.table.io.secret_access_key".to_string(), self.secret_key.clone().to_string(), ); - iceberg_configs + let (bucket, root) = { + let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; + let bucket = url + .host_str() + .ok_or_else(|| { + SinkError::Iceberg(anyhow!("Invalid s3 path: {}, bucket is missing", self.path)) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + + Ok(iceberg_configs) } } @@ -177,7 +202,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_table(&self) -> Result { - let catalog = load_catalog(&self.config.build_iceberg_configs()) + let catalog = load_catalog(&self.config.build_iceberg_configs()?) .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; @@ -414,7 +439,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { .collect::>>()?; let mut txn = Transaction::new(&mut self.table); - txn.append_file( + txn.append_data_file( write_results .into_iter() .flat_map(|s| s.data_files.into_iter()), diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 851c81b8916d0..83ec62e0d7649 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -350,7 +350,7 @@ where async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let payload = match self.payload_format { SinkPayloadFormat::Json => { - let mut row_ops = vec![]; + let mut row_ops = Vec::with_capacity(chunk.cardinality()); let enc = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::String); for (op, row_ref) in chunk.rows() { let map = enc.encode(row_ref)?; diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index b68164beea206..fd25b24f8a7da 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -21,7 +21,7 @@ use crate::sink::Result; pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { let encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - let mut records: Vec = Vec::with_capacity(chunk.capacity()); + let mut records: Vec = Vec::with_capacity(chunk.cardinality()); for (_, row) in chunk.rows() { let record = Value::Object(encoder.encode(row)?); records.push(record.to_string()); diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index f940a705754a5..14510ff63588b 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -226,7 +226,7 @@ impl FunctionAttr { quote! { let mut columns = Vec::with_capacity(self.children.len() - #num_args); for child in &self.children[#num_args..] { - columns.push(child.eval_checked(input).await?); + columns.push(child.eval(input).await?); } let variadic_input = DataChunk::new(columns, input.visibility().clone()); } @@ -438,7 +438,7 @@ impl FunctionAttr { } async fn eval(&self, input: &DataChunk) -> Result { #( - let #array_refs = self.children[#children_indices].eval_checked(input).await?; + let #array_refs = self.children[#children_indices].eval(input).await?; let #arrays: &#arg_arrays = #array_refs.as_ref().into(); )* #eval_variadic @@ -839,15 +839,26 @@ impl FunctionAttr { .map(|i| quote! { self.return_type.as_struct().types().nth(#i).unwrap().clone() }) .collect() }; + #[allow(clippy::disallowed_methods)] + let optioned_outputs = user_fn + .core_return_type + .split(',') + .map(|t| t.contains("Option")) + // example: "(Option<&str>, i32)" => [true, false] + .zip(&outputs) + .map(|(optional, o)| match optional { + false => quote! { Some(#o.as_scalar_ref()) }, + true => quote! { #o.map(|o| o.as_scalar_ref()) }, + }) + .collect_vec(); let build_value_array = if return_types.len() == 1 { quote! { let [value_array] = value_arrays; } } else { quote! { - let bitmap = value_arrays[0].null_bitmap().clone(); let value_array = StructArray::new( self.return_type.as_struct().clone(), value_arrays.to_vec(), - bitmap, + Bitmap::ones(len), ).into_ref(); } }; @@ -925,7 +936,7 @@ impl FunctionAttr { #[try_stream(boxed, ok = DataChunk, error = ExprError)] async fn eval_inner<'a>(&'a self, input: &'a DataChunk) { #( - let #array_refs = self.#child.eval_checked(input).await?; + let #array_refs = self.#child.eval(input).await?; let #arrays: &#arg_arrays = #array_refs.as_ref().into(); )* @@ -938,11 +949,12 @@ impl FunctionAttr { for output in #iter { index_builder.append(Some(i as i32)); match #output { - Some((#(#outputs),*)) => { #(#builders.append(Some(#outputs.as_scalar_ref()));)* } + Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* } None => { #(#builders.append_null();)* } } if index_builder.len() == self.chunk_size { + let len = index_builder.len(); let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref(); let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*]; #build_value_array diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index c6ebffdff660f..484f093ff8883 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -510,9 +510,9 @@ struct UserFunctionAttr { async_: bool, /// Whether contains argument `&Context`. context: bool, - /// The last argument type is `&mut dyn Write`. + /// Whether contains argument `&mut impl Write`. write: bool, - /// The last argument type is `retract: bool`. + /// Whether the last argument type is `retract: bool`. retract: bool, /// The argument type are `Option`s. arg_option: bool, diff --git a/src/expr/src/expr/build.rs b/src/expr/src/expr/build.rs index b2b6db6eafc3c..8f961ded6bad4 100644 --- a/src/expr/src/expr/build.rs +++ b/src/expr/src/expr/build.rs @@ -27,6 +27,7 @@ use super::expr_in::InExpression; use super::expr_some_all::SomeAllExpression; use super::expr_udf::UdfExpression; use super::expr_vnode::VnodeExpression; +use super::wrapper::Checked; use crate::expr::{ BoxedExpression, Expression, InputRefExpression, LiteralExpression, TryFromExprNodeBoxed, }; @@ -35,7 +36,7 @@ use crate::sig::FuncSigDebug; use crate::{bail, ExprError, Result}; /// Build an expression from protobuf. -pub fn build_from_prost(prost: &ExprNode) -> Result { +fn build_from_prost_inner(prost: &ExprNode) -> Result { use PbType as E; let func_call = match prost.get_rex_node()? { @@ -70,6 +71,15 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { } } +/// Build an expression from protobuf with wrappers. +pub fn build_from_prost(prost: &ExprNode) -> Result { + let expr = build_from_prost_inner(prost)?; + + let checked = Checked(expr); + + Ok(checked.boxed()) +} + /// Build an expression in `FuncCall` variant. pub fn build_func( func: PbType, diff --git a/src/expr/src/expr/expr_array_transform.rs b/src/expr/src/expr/expr_array_transform.rs index 4d1afe88bc659..016fad81074dd 100644 --- a/src/expr/src/expr/expr_array_transform.rs +++ b/src/expr/src/expr/expr_array_transform.rs @@ -35,7 +35,7 @@ impl Expression for ArrayTransformExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let lambda_input = self.array.eval_checked(input).await?; + let lambda_input = self.array.eval(input).await?; let lambda_input = Arc::unwrap_or_clone(lambda_input).into_list(); let new_list = lambda_input .map_inner(|flatten_input| async move { diff --git a/src/expr/src/expr/expr_binary_nullable.rs b/src/expr/src/expr/expr_binary_nullable.rs index bc3ccf8f98231..af2db0152a2ef 100644 --- a/src/expr/src/expr/expr_binary_nullable.rs +++ b/src/expr/src/expr/expr_binary_nullable.rs @@ -42,7 +42,7 @@ impl Expression for BinaryShortCircuitExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let left = self.expr_ia1.eval_checked(input).await?; + let left = self.expr_ia1.eval(input).await?; let left = left.as_bool(); let res_vis = match self.expr_type { @@ -58,7 +58,7 @@ impl Expression for BinaryShortCircuitExpression { let mut input1 = input.clone(); input1.set_visibility(new_vis); - let right = self.expr_ia2.eval_checked(&input1).await?; + let right = self.expr_ia2.eval(&input1).await?; let right = right.as_bool(); assert_eq!(left.len(), right.len()); diff --git a/src/expr/src/expr/expr_case.rs b/src/expr/src/expr/expr_case.rs index be4cdb864d520..6e2591ad61f73 100644 --- a/src/expr/src/expr/expr_case.rs +++ b/src/expr/src/expr/expr_case.rs @@ -64,10 +64,10 @@ impl Expression for CaseExpression { let when_len = self.when_clauses.len(); let mut result_array = Vec::with_capacity(when_len + 1); for (when_idx, WhenClause { when, then }) in self.when_clauses.iter().enumerate() { - let calc_then_vis = when.eval_checked(&input).await?.as_bool().to_bitmap(); + let calc_then_vis = when.eval(&input).await?.as_bool().to_bitmap(); let input_vis = input.visibility().clone(); input.set_visibility(calc_then_vis.clone()); - let then_res = then.eval_checked(&input).await?; + let then_res = then.eval(&input).await?; calc_then_vis .iter_ones() .for_each(|pos| selection[pos] = Some(when_idx)); @@ -75,7 +75,7 @@ impl Expression for CaseExpression { result_array.push(then_res); } if let Some(ref else_expr) = self.else_clause { - let else_res = else_expr.eval_checked(&input).await?; + let else_res = else_expr.eval(&input).await?; input .visibility() .iter_ones() diff --git a/src/expr/src/expr/expr_coalesce.rs b/src/expr/src/expr/expr_coalesce.rs index ee47db4db20bc..ca08f18486b66 100644 --- a/src/expr/src/expr/expr_coalesce.rs +++ b/src/expr/src/expr/expr_coalesce.rs @@ -44,7 +44,7 @@ impl Expression for CoalesceExpression { let mut selection: Vec> = vec![None; len]; let mut children_array = Vec::with_capacity(self.children.len()); for (child_idx, child) in self.children.iter().enumerate() { - let res = child.eval_checked(&input).await?; + let res = child.eval(&input).await?; let res_bitmap = res.null_bitmap(); let orig_vis = input.visibility(); for pos in orig_vis.bitand(res_bitmap).iter_ones() { diff --git a/src/expr/src/expr/expr_field.rs b/src/expr/src/expr/expr_field.rs index 3402335549225..b3c5e4f171760 100644 --- a/src/expr/src/expr/expr_field.rs +++ b/src/expr/src/expr/expr_field.rs @@ -40,7 +40,7 @@ impl Expression for FieldExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let array = self.input.eval_checked(input).await?; + let array = self.input.eval(input).await?; if let ArrayImpl::Struct(struct_array) = array.as_ref() { Ok(struct_array.field_at(self.index).clone()) } else { diff --git a/src/expr/src/expr/expr_in.rs b/src/expr/src/expr/expr_in.rs index ed7c9cc9e50cb..44ec69d718364 100644 --- a/src/expr/src/expr/expr_in.rs +++ b/src/expr/src/expr/expr_in.rs @@ -74,7 +74,7 @@ impl Expression for InExpression { } async fn eval(&self, input: &DataChunk) -> Result { - let input_array = self.left.eval_checked(input).await?; + let input_array = self.left.eval(input).await?; let mut output_array = BoolArrayBuilder::new(input_array.len()); for (data, vis) in input_array.iter().zip_eq_fast(input.visibility().iter()) { if vis { diff --git a/src/expr/src/expr/expr_some_all.rs b/src/expr/src/expr/expr_some_all.rs index 9ae9146406ada..fc284d4679e72 100644 --- a/src/expr/src/expr/expr_some_all.rs +++ b/src/expr/src/expr/expr_some_all.rs @@ -82,8 +82,8 @@ impl Expression for SomeAllExpression { } async fn eval(&self, data_chunk: &DataChunk) -> Result { - let arr_left = self.left_expr.eval_checked(data_chunk).await?; - let arr_right = self.right_expr.eval_checked(data_chunk).await?; + let arr_left = self.left_expr.eval(data_chunk).await?; + let arr_right = self.right_expr.eval(data_chunk).await?; let mut num_array = Vec::with_capacity(data_chunk.capacity()); let arr_right_inner = arr_right.as_list(); diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs index 8f85515b18863..3d70b21f866f1 100644 --- a/src/expr/src/expr/expr_udf.rs +++ b/src/expr/src/expr/expr_udf.rs @@ -51,7 +51,7 @@ impl Expression for UdfExpression { let vis = input.visibility(); let mut columns = Vec::with_capacity(self.children.len()); for child in &self.children { - let array = child.eval_checked(input).await?; + let array = child.eval(input).await?; columns.push(array.as_ref().try_into()?); } self.eval_inner(columns, vis).await diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 33509506753fa..7f5c800502557 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -45,6 +45,7 @@ mod expr_some_all; pub(crate) mod expr_udf; mod expr_unary; mod expr_vnode; +pub(crate) mod wrapper; mod build; pub mod test_utils; @@ -75,16 +76,6 @@ pub trait Expression: std::fmt::Debug + Sync + Send { /// Get the return data type. fn return_type(&self) -> DataType; - /// Eval the result with extra checks. - async fn eval_checked(&self, input: &DataChunk) -> Result { - let res = self.eval(input).await?; - - // TODO: Decide to use assert or debug_assert by benchmarks. - assert_eq!(res.len(), input.capacity()); - - Ok(res) - } - /// Evaluate the expression in vectorized execution. Returns an array. /// /// The default implementation calls `eval_v2` and always converts the result to an array. @@ -175,6 +166,34 @@ impl dyn Expression { /// An owned dynamically typed [`Expression`]. pub type BoxedExpression = Box; +// TODO: avoid the overhead of extra boxing. +#[async_trait::async_trait] +impl Expression for BoxedExpression { + fn return_type(&self) -> DataType { + (**self).return_type() + } + + async fn eval(&self, input: &DataChunk) -> Result { + (**self).eval(input).await + } + + async fn eval_v2(&self, input: &DataChunk) -> Result { + (**self).eval_v2(input).await + } + + async fn eval_row(&self, input: &OwnedRow) -> Result { + (**self).eval_row(input).await + } + + fn eval_const(&self) -> Result { + (**self).eval_const() + } + + fn boxed(self) -> BoxedExpression { + self + } +} + /// Controls the behavior when a compute error happens. /// /// - If set to `false`, `NULL` will be inserted. diff --git a/src/expr/src/expr/value.rs b/src/expr/src/expr/value.rs index 4535aa31bcdf4..6321de9d1556b 100644 --- a/src/expr/src/expr/value.rs +++ b/src/expr/src/expr/value.rs @@ -15,7 +15,7 @@ use either::Either; use risingwave_common::array::*; use risingwave_common::for_all_array_variants; -use risingwave_common::types::{Datum, Scalar}; +use risingwave_common::types::{Datum, DatumRef, Scalar, ToDatumRef}; /// The type-erased return value of an expression. /// @@ -26,6 +26,25 @@ pub enum ValueImpl { Scalar { value: Datum, capacity: usize }, } +impl ValueImpl { + /// Number of scalars in this value. + #[inline] + #[expect(clippy::len_without_is_empty)] + pub fn len(&self) -> usize { + self.iter().len() + } + + /// Iterates over all scalars in this value. + pub fn iter(&self) -> impl ExactSizeIterator> + '_ { + match self { + Self::Array(array) => Either::Left(array.iter()), + Self::Scalar { value, capacity } => { + Either::Right(itertools::repeat_n(value.to_datum_ref(), *capacity)) + } + } + } +} + /// The generic reference type of [`ValueImpl`]. Used as the arguments of expressions. #[derive(Debug, Clone, Copy)] pub enum ValueRef<'a, A: Array> { @@ -37,13 +56,18 @@ pub enum ValueRef<'a, A: Array> { } impl<'a, A: Array> ValueRef<'a, A> { + /// Number of scalars in this value. + #[inline] + #[expect(clippy::len_without_is_empty)] + pub fn len(self) -> usize { + self.iter().len() + } + /// Iterates over all scalars in this value. - pub fn iter(self) -> impl Iterator>> + 'a { + pub fn iter(self) -> impl ExactSizeIterator>> + 'a { match self { Self::Array(array) => Either::Left(array.iter()), - Self::Scalar { value, capacity } => { - Either::Right(std::iter::repeat(value).take(capacity)) - } + Self::Scalar { value, capacity } => Either::Right(itertools::repeat_n(value, capacity)), } } } diff --git a/src/expr/src/expr/wrapper/checked.rs b/src/expr/src/expr/wrapper/checked.rs new file mode 100644 index 0000000000000..1e049ad481010 --- /dev/null +++ b/src/expr/src/expr/wrapper/checked.rs @@ -0,0 +1,53 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use risingwave_common::array::{ArrayRef, DataChunk}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum}; + +use crate::error::Result; +use crate::expr::{Expression, ValueImpl}; + +/// A wrapper of [`Expression`] that does extra checks after evaluation. +#[derive(Debug)] +pub struct Checked(pub E); + +// TODO: avoid the overhead of extra boxing. +#[async_trait] +impl Expression for Checked { + fn return_type(&self) -> DataType { + self.0.return_type() + } + + async fn eval(&self, input: &DataChunk) -> Result { + let res = self.0.eval(input).await?; + assert_eq!(res.len(), input.capacity()); + Ok(res) + } + + async fn eval_v2(&self, input: &DataChunk) -> Result { + let res = self.0.eval_v2(input).await?; + assert_eq!(res.len(), input.capacity()); + Ok(res) + } + + async fn eval_row(&self, input: &OwnedRow) -> Result { + self.0.eval_row(input).await + } + + fn eval_const(&self) -> Result { + self.0.eval_const() + } +} diff --git a/src/expr/src/expr/wrapper/mod.rs b/src/expr/src/expr/wrapper/mod.rs new file mode 100644 index 0000000000000..111a5bdf0ceca --- /dev/null +++ b/src/expr/src/expr/wrapper/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod checked; + +pub use checked::Checked; diff --git a/src/expr/src/table_function/mod.rs b/src/expr/src/table_function/mod.rs index 245eebd7f3720..2f64f8449a23d 100644 --- a/src/expr/src/table_function/mod.rs +++ b/src/expr/src/table_function/mod.rs @@ -31,6 +31,7 @@ mod empty; mod generate_series; mod generate_subscripts; mod jsonb; +mod pg_expandarray; mod regexp_matches; mod repeat; mod unnest; diff --git a/src/expr/src/table_function/pg_expandarray.rs b/src/expr/src/table_function/pg_expandarray.rs new file mode 100644 index 0000000000000..1e840002be3d1 --- /dev/null +++ b/src/expr/src/table_function/pg_expandarray.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{ListRef, ScalarRefImpl, StructType}; +use risingwave_expr_macro::function; + +use super::*; + +/// Returns the input array as a set of rows with an index. +/// +/// ```slt +/// query II +/// select * from _pg_expandarray(array[1,2,null]); +/// ---- +/// 1 1 +/// 2 2 +/// NULL 3 +/// +/// query TI +/// select * from _pg_expandarray(array['one', null, 'three']); +/// ---- +/// one 1 +/// NULL 2 +/// three 3 +/// ``` +#[function( + "_pg_expandarray(list) -> setof struct", + type_infer = "infer_type" +)] +fn _pg_expandarray(array: ListRef<'_>) -> impl Iterator>, i32)> { + #[allow(clippy::disallowed_methods)] + array.iter().zip(1..) +} + +fn infer_type(args: &[DataType]) -> Result { + Ok(DataType::Struct(StructType::new(vec![ + ("x", args[0].as_list().clone()), + ("n", DataType::Int32), + ]))) +} diff --git a/src/expr/src/table_function/user_defined.rs b/src/expr/src/table_function/user_defined.rs index aab86bea250a6..0dc1d876c031a 100644 --- a/src/expr/src/table_function/user_defined.rs +++ b/src/expr/src/table_function/user_defined.rs @@ -55,7 +55,7 @@ impl UserDefinedTableFunction { // evaluate children expressions let mut columns = Vec::with_capacity(self.children.len()); for c in &self.children { - let val = c.eval_checked(input).await?; + let val = c.eval(input).await?; columns.push(val); } let direct_input = DataChunk::new(columns, input.visibility().clone()); diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 1146050e8dde4..904474fe0e5af 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -20,7 +20,7 @@ use std::sync::LazyLock; use bk_tree::{metrics, BKTree}; use itertools::Itertools; use risingwave_common::array::ListValue; -use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME; +use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; @@ -59,12 +59,27 @@ impl Binder { [schema, name] => { let schema_name = schema.real_value(); if schema_name == PG_CATALOG_SCHEMA_NAME { + // pg_catalog is always effectively part of the search path, so we can always bind the function. + // Ref: https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-CATALOG name.real_value() + } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME { + // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql + // + // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path. + let function_name = name.real_value(); + if function_name != "_pg_expandarray" { + return Err(ErrorCode::NotImplemented( + format!("Unsupported function name under schema: {}", schema_name), + 12422.into(), + ) + .into()); + } + function_name } else { - return Err(ErrorCode::BindError(format!( - "Unsupported function name under schema: {}", - schema_name - )) + return Err(ErrorCode::NotImplemented( + format!("Unsupported function name under schema: {}", schema_name), + 12422.into(), + ) .into()); } } @@ -138,7 +153,7 @@ impl Binder { } // user defined function - // TODO: resolve schema name + // TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422 if let Ok(schema) = self.first_valid_schema() && let Some(func) = schema.get_function_by_name_args( &function_name, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index f0df9808094b8..d2120c23b07fd 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -234,129 +234,6 @@ impl fmt::Display for SourceSchema { } } -/// will be deprecated and be replaced by Format and Encode -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum RowFormat { - Protobuf, // Keyword::PROTOBUF - Json, // Keyword::JSON - DebeziumJson, // Keyword::DEBEZIUM_JSON - DebeziumMongoJson, // Keyword::DEBEZIUM_MONGO_JSON - UpsertJson, // Keyword::UPSERT_JSON - Avro, // Keyword::AVRO - UpsertAvro, // Keyword::UpsertAVRO - Maxwell, // Keyword::MAXWELL - CanalJson, // Keyword::CANAL_JSON - Csv, // Keyword::CSV - DebeziumAvro, // Keyword::DEBEZIUM_AVRO - Bytes, // Keyword::BYTES - Native, -} - -impl RowFormat { - pub fn from_keyword(s: &str) -> Result { - Ok(match s { - "JSON" => RowFormat::Json, - "UPSERT_JSON" => RowFormat::UpsertJson, - "PROTOBUF" => RowFormat::Protobuf, - "DEBEZIUM_JSON" => RowFormat::DebeziumJson, - "DEBEZIUM_MONGO_JSON" => RowFormat::DebeziumMongoJson, - "AVRO" => RowFormat::Avro, - "UPSERT_AVRO" => RowFormat::UpsertAvro, - "MAXWELL" => RowFormat::Maxwell, - "CANAL_JSON" => RowFormat::CanalJson, - "CSV" => RowFormat::Csv, - "DEBEZIUM_AVRO" => RowFormat::DebeziumAvro, - "BYTES" => RowFormat::Bytes, - _ => return Err(ParserError::ParserError( - "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO | AVRO | UPSERT_AVRO | MAXWELL | CANAL_JSON | BYTES after ROW FORMAT".to_string(), - )) - }) - } - - /// a compatibility layer, return (format, row_encode) - pub fn to_format_v2(&self) -> (Format, Encode) { - let format = match self { - RowFormat::Protobuf => Format::Plain, - RowFormat::Json => Format::Plain, - RowFormat::DebeziumJson => Format::Debezium, - RowFormat::DebeziumMongoJson => Format::DebeziumMongo, - RowFormat::UpsertJson => Format::Upsert, - RowFormat::Avro => Format::Plain, - RowFormat::UpsertAvro => Format::Upsert, - RowFormat::Maxwell => Format::Maxwell, - RowFormat::CanalJson => Format::Canal, - RowFormat::Csv => Format::Plain, - RowFormat::DebeziumAvro => Format::Debezium, - RowFormat::Bytes => Format::Plain, - RowFormat::Native => Format::Native, - }; - - let encode = match self { - RowFormat::Protobuf => Encode::Protobuf, - RowFormat::Json => Encode::Json, - RowFormat::DebeziumJson => Encode::Json, - RowFormat::DebeziumMongoJson => Encode::Json, - RowFormat::UpsertJson => Encode::Json, - RowFormat::Avro => Encode::Avro, - RowFormat::UpsertAvro => Encode::Avro, - RowFormat::Maxwell => Encode::Json, - RowFormat::CanalJson => Encode::Json, - RowFormat::Csv => Encode::Csv, - RowFormat::DebeziumAvro => Encode::Avro, - RowFormat::Bytes => Encode::Bytes, - RowFormat::Native => Encode::Native, - }; - (format, encode) - } - - /// a compatibility layer - pub fn from_format_v2(format: &Format, encode: &Encode) -> Result { - Ok(match (format, encode) { - (Format::Native, Encode::Native) => RowFormat::Native, - (Format::Native, _) => unreachable!(), - (_, Encode::Native) => unreachable!(), - (Format::Debezium, Encode::Avro) => RowFormat::DebeziumAvro, - (Format::Debezium, Encode::Json) => RowFormat::DebeziumJson, - (Format::Debezium, _) => { - return Err(ParserError::ParserError( - "The DEBEZIUM format only support AVRO and JSON Encoding".to_string(), - )) - } - (Format::DebeziumMongo, Encode::Json) => RowFormat::DebeziumMongoJson, - (Format::DebeziumMongo, _) => { - return Err(ParserError::ParserError( - "The DEBEZIUM_MONGO format only support JSON Encoding".to_string(), - )) - } - (Format::Maxwell, Encode::Json) => RowFormat::Maxwell, - (Format::Maxwell, _) => { - return Err(ParserError::ParserError( - "The MAXWELL format only support JSON Encoding".to_string(), - )) - } - (Format::Canal, Encode::Json) => RowFormat::CanalJson, - (Format::Canal, _) => { - return Err(ParserError::ParserError( - "The CANAL format only support JSON Encoding".to_string(), - )) - } - (Format::Upsert, Encode::Avro) => RowFormat::UpsertAvro, - (Format::Upsert, Encode::Json) => RowFormat::UpsertJson, - (Format::Upsert, _) => { - return Err(ParserError::ParserError( - "The UPSERT format only support AVRO and JSON Encoding".to_string(), - )) - } - (Format::Plain, Encode::Avro) => RowFormat::Avro, - (Format::Plain, Encode::Csv) => RowFormat::Csv, - (Format::Plain, Encode::Protobuf) => RowFormat::Protobuf, - (Format::Plain, Encode::Json) => RowFormat::Json, - (Format::Plain, Encode::Bytes) => RowFormat::Bytes, - }) - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { @@ -485,22 +362,6 @@ impl fmt::Display for CompatibleSourceSchema { } impl CompatibleSourceSchema { - #[deprecated] - pub fn into_source_schema( - self, - ) -> Result<(SourceSchema, Vec, Option), ParserError> { - match self { - CompatibleSourceSchema::RowFormat(inner) => Ok(( - inner, - vec![], - Some("RisingWave will stop supporting the syntax \"ROW FORMAT\" in future versions, which will be changed to \"FORMAT ... ENCODE ...\" syntax.".to_string()), - )), - CompatibleSourceSchema::V2(inner) => { - inner.into_source_schema().map(|(s, ops)| (s, ops, None)) - } - } - } - pub fn into_source_schema_v2(self) -> (SourceSchemaV2, Option) { match self { CompatibleSourceSchema::RowFormat(inner) => ( @@ -673,112 +534,6 @@ impl SourceSchemaV2 { .try_collect() } - /// just a temporal compatibility layer will be removed soon(so the implementation is a little - /// dirty) - #[allow(deprecated)] - pub fn into_source_schema(self) -> Result<(SourceSchema, Vec), ParserError> { - let options: BTreeMap = self - .row_options - .iter() - .cloned() - .map(|x| match x.value { - Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), - Value::Number(n) => Ok((x.name.real_value(), n)), - Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), - _ => Err(ParserError::ParserError( - "`row format options` only support single quoted string value".to_owned(), - )), - }) - .try_collect()?; - - let try_consume_string_from_options = - |row_options: &BTreeMap, key: &str| -> Option { - row_options.get(key).cloned().map(AstString) - }; - let consume_string_from_options = - |row_options: &BTreeMap, key: &str| -> Result { - try_consume_string_from_options(row_options, key).ok_or_else(|| { - ParserError::ParserError(format!("missing field {} in row format options", key)) - }) - }; - let get_schema_location = - |row_options: &BTreeMap| -> Result<(AstString, bool), ParserError> { - let schema_location = - try_consume_string_from_options(row_options, "schema.location"); - let schema_registry = - try_consume_string_from_options(row_options, "schema.registry"); - match (schema_location, schema_registry) { - (None, None) => Err(ParserError::ParserError( - "missing either a schema location or a schema registry".to_string(), - )), - (None, Some(schema_registry)) => Ok((schema_registry, true)), - (Some(schema_location), None) => Ok((schema_location, false)), - (Some(_), Some(_)) => Err(ParserError::ParserError( - "only need either the schema location or the schema registry".to_string(), - )), - } - }; - let row_format = RowFormat::from_format_v2(&self.format, &self.row_encode)?; - Ok(( - match row_format { - RowFormat::Protobuf => { - let (row_schema_location, use_schema_registry) = get_schema_location(&options)?; - SourceSchema::Protobuf(ProtobufSchema { - message_name: consume_string_from_options(&options, "message")?, - row_schema_location, - use_schema_registry, - }) - } - RowFormat::Json => SourceSchema::Json, - RowFormat::DebeziumJson => SourceSchema::DebeziumJson, - RowFormat::DebeziumMongoJson => SourceSchema::DebeziumMongoJson, - RowFormat::UpsertJson => SourceSchema::UpsertJson, - RowFormat::Avro => { - let (row_schema_location, use_schema_registry) = get_schema_location(&options)?; - SourceSchema::Avro(AvroSchema { - row_schema_location, - use_schema_registry, - }) - } - RowFormat::UpsertAvro => { - let (row_schema_location, use_schema_registry) = get_schema_location(&options)?; - SourceSchema::UpsertAvro(AvroSchema { - row_schema_location, - use_schema_registry, - }) - } - RowFormat::Maxwell => SourceSchema::Maxwell, - RowFormat::CanalJson => SourceSchema::CanalJson, - RowFormat::Csv => { - let chars = consume_string_from_options(&options, "delimiter")?.0; - let delimiter = get_delimiter(chars.as_str())?; - let has_header = try_consume_string_from_options(&options, "without_header") - .map(|s| s.0 == "false") - .unwrap_or(true); - SourceSchema::Csv(CsvInfo { - delimiter, - has_header, - }) - } - RowFormat::DebeziumAvro => { - let (row_schema_location, use_schema_registry) = get_schema_location(&options)?; - if !use_schema_registry { - return Err(ParserError::ParserError( - "schema location for DEBEZIUM_AVRO row format is not supported" - .to_string(), - )); - } - SourceSchema::DebeziumAvro(DebeziumAvroSchema { - row_schema_location, - }) - } - RowFormat::Bytes => SourceSchema::Bytes, - RowFormat::Native => SourceSchema::Native, - }, - self.row_options, - )) - } - pub fn row_options(&self) -> &[SqlOption] { self.row_options.as_ref() } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 31705e792909a..a5a39723c9319 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -1002,8 +1002,6 @@ impl HashJoinExecutor HashJoinExecutor = if side_update diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 462f3f2fe1af0..7a523dc89540f 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -209,10 +209,11 @@ impl TemporalSide { right_stream_key_indices: &[usize], ) -> StreamExecutorResult<()> { for chunk in chunks { - // Compact chunk, otherwise the following keys and chunk rows might fail to zip. - let chunk = chunk.compact(); let keys = K::build(join_keys, chunk.data_chunk())?; - for ((op, row), key) in chunk.rows().zip_eq_debug(keys.into_iter()) { + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, row)) = r else { + continue; + }; if self.cache.contains(&key) { // Update cache let mut entry = self.cache.get_mut(&key).unwrap(); @@ -427,8 +428,6 @@ impl TemporalJoinExecutor yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); } InternalMessage::Chunk(chunk) => { - // Compact chunk, otherwise the following keys and chunk rows might fail to zip. - let chunk = chunk.compact(); let mut builder = JoinStreamChunkBuilder::new( self.chunk_size, self.schema.data_types(), @@ -437,7 +436,10 @@ impl TemporalJoinExecutor ); let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - for ((op, left_row), key) in chunk.rows().zip_eq_debug(keys.into_iter()) { + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; if key.null_bitmap().is_subset(&null_matched) && let join_entry = self.right_table.lookup(&key, epoch).await? && !join_entry.is_empty() { diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 7e075002b99cb..78c12ee82f3cd 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -181,11 +181,13 @@ where async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { let mut res_ops = Vec::with_capacity(self.limit); let mut res_rows = Vec::with_capacity(self.limit); - let chunk = chunk.compact(); let keys = K::build(&self.group_by, chunk.data_chunk())?; let table_id_str = self.managed_state.state_table.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); - for ((op, row_ref), group_cache_key) in chunk.rows().zip_eq_debug(keys.iter()) { + for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { + let Some((op, row_ref)) = r else { + continue; + }; // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 8df8ee17768c5..140a06984e586 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -175,14 +175,16 @@ where async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { let mut res_ops = Vec::with_capacity(self.limit); let mut res_rows = Vec::with_capacity(self.limit); - let chunk = chunk.compact(); let keys = K::build(&self.group_by, chunk.data_chunk())?; let data_types = self.schema().data_types(); let row_deserializer = RowDeserializer::new(data_types.clone()); let table_id_str = self.managed_state.state_table.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); - for ((op, row_ref), group_cache_key) in chunk.rows().zip_eq_debug(keys.iter()) { + for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { + let Some((op, row_ref)) = r else { + continue; + }; // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); diff --git a/src/utils/pgwire/src/pg_message.rs b/src/utils/pgwire/src/pg_message.rs index 408330a2df6ae..b28d09116f94f 100644 --- a/src/utils/pgwire/src/pg_message.rs +++ b/src/utils/pgwire/src/pg_message.rs @@ -451,8 +451,8 @@ impl<'a> BeMessage<'a> { // Parameter names and values are passed as null-terminated strings let iov = &mut [name, b"\0", value, b"\0"].map(IoSlice::new); - let mut buffer = [0u8; 64]; // this should be enough - let cnt = buffer.as_mut().write_vectored(iov).unwrap(); + let mut buffer = vec![]; + let cnt = buffer.write_vectored(iov).unwrap(); buf.put_u8(b'S'); write_body(buf, |stream| { diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 9a8ddc3de1173..acbc053ff9983 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -112,7 +112,7 @@ subtle = { version = "2" } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } tokio = { version = "1", features = ["full", "stats", "tracing"] } -tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "4538cd6", features = ["with-chrono-0_4"] } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -126,6 +126,7 @@ unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } +whoami = { version = "1" } [build-dependencies] ahash = { version = "0.8" } @@ -206,5 +207,6 @@ tracing-core = { version = "0.1" } unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } +whoami = { version = "1" } ### END HAKARI SECTION