Skip to content

Commit

Permalink
Merge branch 'main' into yiming/refine-sink-trait
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 26, 2023
2 parents fd685d8 + 5eeef12 commit c855fbf
Show file tree
Hide file tree
Showing 130 changed files with 3,210 additions and 1,564 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/hakari_fix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ jobs:
steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
ref: ${{ github.event.pull_request.head.ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}

- name: Install cargo-hakari
uses: taiki-e/install-action@v2
Expand Down
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
# Hence keeping it in case we ever need to debug backfill again.

# USAGE:
# Start a rw cluster then run this script.
# ```sh
# ./risedev d
# cargo make ci-start ci-backfill
# ./ci/scripts/run-backfill-tests.sh
# ```

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/sql/backfill/insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ insert into t1
SELECT
generate_series,
'{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}'::jsonb
FROM generate_series(1, 200000);
FROM generate_series(1, 100000);
FLUSH;
4 changes: 2 additions & 2 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ steps:
mount-buildkite-agent: true
environment:
- GITHUB_TOKEN
timeout_in_minutes: 12
timeout_in_minutes: 14
retry: *auto-retry

- label: "build (deterministic simulation)"
Expand Down Expand Up @@ -273,7 +273,7 @@ steps:
config: ci/docker-compose.yml
environment:
- CODECOV_TOKEN
timeout_in_minutes: 18
timeout_in_minutes: 20
retry: *auto-retry

- label: "check"
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ services:
timeout: 5s
retries: 5
compute-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.0.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
command:
- compute-node
- "--listen-addr"
Expand Down Expand Up @@ -126,7 +126,7 @@ services:
timeout: 5s
retries: 5
frontend-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.0.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
command:
- frontend-node
- "--listen-addr"
Expand Down Expand Up @@ -185,7 +185,7 @@ services:
timeout: 5s
retries: 5
meta-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.0.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
command:
- meta-node
- "--listen-addr"
Expand Down Expand Up @@ -301,7 +301,7 @@ services:
timeout: 5s
retries: 5
connector-node:
image: ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.0.0}
image: ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}
entrypoint: "/risingwave/bin/connector-node/start-service.sh"
ports:
- 50051
Expand Down
2 changes: 1 addition & 1 deletion docs/memory-profiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ cp ./target/release/examples/addr2line <your-path>
Find a Linux machine and use `docker` command to start an environment with the specific RisingWave version. Here, `-v $(pwd):/dumps` mounts current directory to `/dumps` folder inside the container, so that you don't need to copy the files in and out.

```bash
docker run -it --rm --entrypoint /bin/bash -v $(pwd):/dumps ghcr.io/risingwavelabs/risingwave:v1.0.0
docker run -it --rm --entrypoint /bin/bash -v $(pwd):/dumps ghcr.io/risingwavelabs/risingwave:latest
```

</details>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class SinkWriterStreamObserver

private TableSchema tableSchema;

private boolean finished = false;

private boolean epochStarted;
private long currentEpoch;
private Long currentBatchId;
Expand All @@ -58,6 +60,9 @@ public SinkWriterStreamObserver(

@Override
public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
if (finished) {
throw new RuntimeException("unexpected onNext call on a finished writer stream");
}
try {
if (sinkTask.hasStart()) {
if (isInitialized()) {
Expand Down Expand Up @@ -169,26 +174,27 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException();
}
} catch (Exception e) {
LOG.error("sink task error: ", e);
LOG.error("sink writer error: ", e);
cleanup();
responseObserver.onError(e);
}
}

@Override
public void onError(Throwable throwable) {
LOG.error("sink task error: ", throwable);
LOG.error("sink writer finishes with error: ", throwable);
cleanup();
responseObserver.onError(throwable);
}

@Override
public void onCompleted() {
LOG.debug("sink task completed");
LOG.info("sink writer completed");
cleanup();
responseObserver.onCompleted();
}

private void cleanup() {
finished = true;
if (sink != null) {
sink.drop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;

@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.MILLISECONDS, batchSize = 10)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.MILLISECONDS, batchSize = 10)
@Fork(value = 1)
@BenchmarkMode(org.openjdk.jmh.annotations.Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand All @@ -30,8 +30,6 @@ public class ArrayListBenchmark {
@Param({"100", "1000", "10000"})
static int loopTime;

ArrayList<ArrayList<Object>> data = new ArrayList<>();

public ArrayList<Object> getRow(int index) {
short v1 = (short) index;
int v2 = (int) index;
Expand Down Expand Up @@ -61,17 +59,10 @@ public void getValue(ArrayList<Object> rowData) {
Integer mayNull = (Integer) rowData.get(6);
}

@Setup
public void setup() {
for (int i = 0; i < loopTime; i++) {
data.add(getRow(i));
}
}

@Benchmark
public void arrayListTest() {
for (int i = 0; i < loopTime; i++) {
getValue(data.get(i));
getValue(getRow(i));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,37 @@

package com.risingwave.java.binding;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;

@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.MILLISECONDS, batchSize = 10)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.MILLISECONDS, batchSize = 10)
@Fork(value = 1)
@BenchmarkMode(org.openjdk.jmh.annotations.Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(org.openjdk.jmh.annotations.Scope.Benchmark)
public class StreamchunkBenchmark {
@Param({"100", "1000", "10000"})
static int loopTime;
int loopTime;

String str;
StreamChunkIterator iter;
Iterator<StreamChunkIterator> iterOfIter;

@Setup(Level.Invocation)
@Setup(Level.Iteration)
public void setup() {
str = "i i I f F B i";
for (int i = 0; i < loopTime; i++) {
String b = i % 2 == 0 ? "f" : "t";
String n = i % 2 == 0 ? "." : "1";
str += String.format("\n + %d %d %d %d.0 %d.0 %s %s", i, i, i, i, i, b, n);
var iterList = new ArrayList<StreamChunkIterator>();
for (int iterI = 0; iterI < 10; iterI++) {
String str = "i i I f F B i";
for (int i = 0; i < loopTime; i++) {
String b = i % 2 == 0 ? "f" : "t";
String n = i % 2 == 0 ? "." : "1";
str += String.format("\n + %d %d %d %d.0 %d.0 %s %s", i, i, i, i, i, b, n);
}
var iter = new StreamChunkIterator(str);
iterList.add(iter);
}
iter = new StreamChunkIterator(str);
iterOfIter = iterList.iterator();
}

public void getValue(StreamChunkRow row) {
Expand All @@ -55,6 +61,10 @@ public void getValue(StreamChunkRow row) {

@Benchmark
public void streamchunkTest() {
if (!iterOfIter.hasNext()) {
throw new RuntimeException("too few prepared iter");
}
var iter = iterOfIter.next();
int count = 0;
while (true) {
try (StreamChunkRow row = iter.next()) {
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ macro_rules! for_all_classified_sources {
{ Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
,$extra_args
Expand Down Expand Up @@ -152,7 +153,7 @@ macro_rules! dispatch_split_impl {
macro_rules! impl_split {
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {

#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
#[derive(Debug, Clone, EnumAsInner, PartialEq)]
pub enum SplitImpl {
$(
$variant_name($split),
Expand Down
Loading

0 comments on commit c855fbf

Please sign in to comment.