diff --git a/Cargo.lock b/Cargo.lock index 6be570b2eabd9..7bf8491ee5bbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8502,6 +8502,7 @@ dependencies = [ "auto_enums", "await-tree", "bytes", + "chrono", "criterion", "educe", "either", diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 4d508cc37894e..2e2c565680a0b 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -117,9 +117,16 @@ impl Source { sub_fields: vec![], type_name: "".to_string(), }; + let update_time = Field { + data_type: DataType::Timestamptz, + name: "update_time".to_string(), + sub_fields: vec![], + type_name: "".to_string(), + }; let ordered_col_idx = builder.add_column(&key); builder.add_column(&value); + builder.add_column(&update_time); builder.add_order_column(ordered_col_idx, OrderType::ascending()); builder.build(vec![], 1) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index d85914a87b0df..1426bbd1ec485 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -22,6 +22,10 @@ async-trait = "0.1" auto_enums = "0.8" await-tree = { workspace = true } bytes = "1" +chrono = { version = "0.4", default-features = false, features = [ + "clock", + "std", +] } educe = "0.4" either = "1" enum-as-inner = "0.6" diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index d51d62ebfef06..f16bc0e4f6b01 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -16,13 +16,14 @@ use std::collections::HashSet; use std::ops::{Bound, Deref}; use std::sync::Arc; +use chrono::Utc; use futures::{pin_mut, StreamExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{DatabaseId, SchemaId}; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl}; +use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl, Timestamptz}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; @@ -158,9 +159,11 @@ impl SourceStateTableHandler { } pub async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { + let update_time = Timestamptz::from_micros(Utc::now().timestamp_micros()); let row = [ Some(Self::string_to_scalar(key.deref())), Some(ScalarImpl::Jsonb(value)), + Some(update_time.into()), ]; match self.get(key).await? { Some(prev_row) => { @@ -277,6 +280,7 @@ pub fn default_source_internal_table(id: u32) -> PbTable { let columns = vec![ make_column(TypeName::Varchar, 0), make_column(TypeName::Jsonb, 1), + make_column(TypeName::Timestamptz, 2), ]; PbTable { id,