Skip to content

Commit

Permalink
Merge branch 'main' into dylan/fix_temporal_join_shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jan 29, 2024
2 parents d722b80 + b471a9b commit fa6a311
Show file tree
Hide file tree
Showing 56 changed files with 1,001 additions and 447 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ services:
- "0.0.0.0:5691"
- "--prometheus-host"
- "0.0.0.0:1250"
- "--prometheus-endpoint"
- "http://prometheus-0:9500"
- "--backend"
- etcd
- "--etcd-endpoints"
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose-with-hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ services:
- "0.0.0.0:5691"
- "--prometheus-host"
- "0.0.0.0:1250"
- "--prometheus-endpoint"
- "http://prometheus-0:9500"
- "--backend"
- etcd
- "--etcd-endpoints"
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-local-fs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
Expand Down
22 changes: 22 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,28 @@ profile:
- use: frontend
- use: compactor

ci-3cn-1fe-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-1cn-1fe-kafka-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ pub struct MetaDeveloperConfig {
/// in the meta node.
#[serde(default = "default::developer::meta_cached_traces_memory_limit_bytes")]
pub cached_traces_memory_limit_bytes: usize,

/// Compaction picker config
#[serde(default = "default::developer::enable_trivial_move")]
pub enable_trivial_move: bool,
#[serde(default = "default::developer::enable_check_task_level_overlap")]
pub enable_check_task_level_overlap: bool,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1422,6 +1428,14 @@ pub mod default {
pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}

pub fn enable_trivial_move() -> bool {
true
}

pub fn enable_check_task_level_overlap() -> bool {
false
}
}

pub use crate::system_param::default as system;
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ impl Timestamp {
self.0.timestamp_nanos_opt().unwrap()
}

pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
let secs = timestamp_millis.div_euclid(1_000);
let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
Self::with_secs_nsecs(secs, nsecs as u32)
}

pub fn with_micros(timestamp_micros: i64) -> Result<Self> {
let secs = timestamp_micros.div_euclid(1_000_000);
let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
Expand Down
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ enable_emergency_picker = true
[meta.developer]
meta_cached_traces_num = 256
meta_cached_traces_memory_limit_bytes = 134217728
meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false

[batch]
enable_barrier_read = false
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pulsar = { version = "6.1", default-features = false, features = [
"tokio-runtime",
"telemetry",
"auth-oauth2",
"lz4",
"zstd",
] }
rdkafka = { workspace = true, features = [
"cmake-build",
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result<DataType> {
DataType::Decimal
}
Schema::Date => DataType::Date,
Schema::LocalTimestampMillis => DataType::Timestamp,
Schema::LocalTimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
Expand Down
53 changes: 31 additions & 22 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use chrono::Datelike;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
use risingwave_common::types::{
DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;

use super::{Access, AccessError, AccessResult};
Expand Down Expand Up @@ -181,19 +182,27 @@ impl<'a> AvroParseOptions<'a> {
}
(Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(),
// ---- Timestamp -----
(Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => {
i64_to_timestamp(*ms).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => {
Timestamp::with_millis(*ms)
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => {
i64_to_timestamp(*us).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => {
Timestamp::with_micros(*us)
.map_err(|_| create_error())?
.into()
}

// ---- TimestampTz -----
(Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => {
i64_to_timestamptz(*ms).map_err(|_| create_error())?.into()
(Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => {
Timestamptz::from_millis(*ms)
.ok_or(AccessError::Other(anyhow!(
"timestamptz with milliseconds {ms} * 1000 is out of range",
)))?
.into()
}
(Some(DataType::Timestamptz), Value::TimestampMicros(us)) => {
i64_to_timestamptz(*us).map_err(|_| create_error())?.into()
(Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => {
Timestamptz::from_micros(*us).into()
}

// ---- Interval -----
Expand Down Expand Up @@ -424,7 +433,7 @@ pub(crate) fn unix_epoch_days() -> i32 {
mod tests {
use apache_avro::Decimal as AvroDecimal;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::{Decimal, Timestamp};
use risingwave_common::types::{Decimal, Timestamptz};

use super::*;

Expand Down Expand Up @@ -486,24 +495,24 @@ mod tests {
}

#[test]
fn test_avro_timestamp_micros() {
let v1 = Value::TimestampMicros(1620000000000);
let v2 = Value::TimestampMillis(1620000000);
fn test_avro_timestamptz_micros() {
let v1 = Value::TimestampMicros(1620000000000000);
let v2 = Value::TimestampMillis(1620000000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
assert_eq!(
datum1,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
Some(ScalarImpl::Timestamptz(
Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
))
);
assert_eq!(
datum2,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
Some(ScalarImpl::Timestamptz(
Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
))
);
}

Expand Down
5 changes: 5 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.developer
.cached_traces_memory_limit_bytes,
enable_trivial_move: config.meta.developer.enable_trivial_move,
enable_check_task_level_overlap: config
.meta
.developer
.enable_check_task_level_overlap,
},
config.system.into_init_system_params(),
)
Expand Down
Loading

0 comments on commit fa6a311

Please sign in to comment.