Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Aug 29, 2024
2 parents dec3a05 + ca99aee commit 3d68bd9
Show file tree
Hide file tree
Showing 54 changed files with 336 additions and 1,039 deletions.
11 changes: 11 additions & 0 deletions e2e_test/batch/transaction/read_only_multi_conn.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ insert into t values (1), (2);
statement ok
flush;

connection txn
statement ok
SET VISIBILITY_MODE TO checkpoint;

connection txn
statement ok
start transaction read only;

connection txn
query I
select count(*) from t;
----
Expand Down Expand Up @@ -42,24 +48,29 @@ select count(*) from t;
3

# ...but not in the read-only transaction
connection txn
query I
select count(*) from t;
----
2

connection txn
statement ok
flush;

# still invisible even after flush
connection txn
query I
select count(*) from t;
----
2

connection txn
statement ok
commit;

# now visible outside the transaction
connection txn
query I
select count(*) from t;
----
Expand Down
3 changes: 2 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ message HummockSnapshot {
// Epoch with checkpoint, we will read durable data with it.
uint64 committed_epoch = 1;
// Epoch without checkpoint, we will read real-time data with it. But it may be rolled back.
uint64 current_epoch = 2;
reserved 2;
reserved "current_epoch";
}

message VersionUpdatePayload {
Expand Down
36 changes: 3 additions & 33 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated BuildActorInfo actors = 2;
}

message UpdateActorsResponse {
common.Status status = 1;
}

message BroadcastActorInfoTableRequest {
repeated common.ActorInfo info = 1;
}

// Create channels and gRPC connections for a fragment
message BuildActorsRequest {
string request_id = 1;
repeated uint32 actor_id = 2;
}

message BuildActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
Expand All @@ -68,6 +43,9 @@ message InjectBarrierRequest {
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
}

message BarrierCompleteResponse {
Expand Down Expand Up @@ -95,11 +73,6 @@ message BarrierCompleteResponse {
uint64 epoch = 9;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand Down Expand Up @@ -136,9 +109,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
Expand Down
57 changes: 0 additions & 57 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, Streaming};
Expand All @@ -41,62 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn update_actors(
&self,
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self.mgr.update_actors(req.actors).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update stream actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(UpdateActorsResponse { status: None })),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn build_actors(
&self,
request: Request<BuildActorsRequest>,
) -> std::result::Result<Response<BuildActorsResponse>, Status> {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self.mgr.build_actors(actor_id).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Err(e.into())
}
Ok(()) => Ok(Response::new(BuildActorsResponse {
request_id: req.request_id,
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn broadcast_actor_info_table(
&self,
request: Request<BroadcastActorInfoTableRequest>,
) -> std::result::Result<Response<BroadcastActorInfoTableResponse>, Status> {
let req = request.into_inner();

let res = self.mgr.update_actor_info(req.info).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update actor info table actor");
Err(e.into())
}
Ok(()) => Ok(Response::new(BroadcastActorInfoTableResponse {
status: None,
})),
}
}

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
Expand Down
22 changes: 10 additions & 12 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

const fn _default_message_timeout_ms() -> usize {
5000
}

const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}
Expand Down Expand Up @@ -150,12 +146,9 @@ pub struct RdKafkaPropertiesProducer {
/// Produce message timeout.
/// This value is used to limits the time a produced message waits for
/// successful delivery (including retries).
#[serde(
rename = "properties.message.timeout.ms",
default = "_default_message_timeout_ms"
)]
#[serde_as(as = "DisplayFromStr")]
message_timeout_ms: usize,
#[serde(rename = "properties.message.timeout.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_timeout_ms: Option<usize>,

/// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
#[serde(
Expand Down Expand Up @@ -205,7 +198,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = self.request_required_acks {
c.set("request.required.acks", v.to_string());
}
c.set("message.timeout.ms", self.message_timeout_ms.to_string());
if let Some(v) = self.message_timeout_ms {
c.set("message.timeout.ms", v.to_string());
}
c.set(
"max.in.flight.requests.per.connection",
self.max_in_flight_requests_per_connection.to_string(),
Expand Down Expand Up @@ -626,7 +621,10 @@ mod test {
c.rdkafka_properties_producer.compression_codec,
Some(CompressionCodec::Zstd)
);
assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514);
assert_eq!(
c.rdkafka_properties_producer.message_timeout_ms,
Some(114514)
);
assert_eq!(
c.rdkafka_properties_producer
.max_in_flight_requests_per_connection,
Expand Down
1 change: 0 additions & 1 deletion src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ KafkaConfig:
This value is used to limits the time a produced message waits for
successful delivery (including retries).
required: false
default: '5000'
- name: properties.max.in.flight.requests.per.connection
field_type: usize
comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::handler::create_source::{
bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector,
bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY,
Expand Down Expand Up @@ -329,9 +329,10 @@ pub fn bind_sql_column_constraints(
// so the rewritten expression should almost always be pure and we directly call `fold_const`
// here. Actually we do not require purity of the expression here since we're only to get a
// snapshot value.
let rewritten_expr_impl =
InlineNowProcTime::new(session.pinned_snapshot().epoch())
.rewrite_expr(expr_impl.clone());
let rewritten_expr_impl = session
.pinned_snapshot()
.inline_now_proc_time()
.rewrite_expr(expr_impl.clone());

if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
let snapshot_value = snapshot_value?;
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::bail;
use super::plan_node::RewriteExprsRecursive;
use super::plan_visitor::has_logical_max_one_row;
use crate::error::Result;
use crate::expr::{InlineNowProcTime, NowProcTimeFinder};
use crate::expr::NowProcTimeFinder;
use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
Expand Down Expand Up @@ -540,8 +540,7 @@ impl LogicalOptimizer {
return plan;
}

let epoch = ctx.session_ctx().pinned_snapshot().epoch();
let mut v = InlineNowProcTime::new(epoch);
let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time();

let plan = plan.rewrite_exprs_recursive(&mut v);

Expand Down
40 changes: 14 additions & 26 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::watch;

use crate::expr::InlineNowProcTime;
use crate::meta_client::FrontendMetaClient;

/// The interval between two unpin batches.
Expand Down Expand Up @@ -60,23 +61,12 @@ impl ReadSnapshot {
}
}

/// Get the [`Option<Epoch>`] value for this snapshot, only `FrontendPinned`.
pub fn epoch_with_frontend_pinned(&self) -> Option<Epoch> {
match self.batch_query_epoch().epoch.unwrap() {
batch_query_epoch::Epoch::Committed(epoch)
| batch_query_epoch::Epoch::Current(epoch) => Some(epoch.into()),
batch_query_epoch::Epoch::Backup(_) | batch_query_epoch::Epoch::TimeTravel(_) => None,
}
}

/// Get the [`Epoch`] value for this snapshot.
pub fn epoch(&self) -> Epoch {
match self.batch_query_epoch().epoch.unwrap() {
batch_query_epoch::Epoch::Committed(epoch)
| batch_query_epoch::Epoch::Current(epoch)
| batch_query_epoch::Epoch::Backup(epoch)
| batch_query_epoch::Epoch::TimeTravel(epoch) => epoch.into(),
}
pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
let epoch = match self {
ReadSnapshot::FrontendPinned { snapshot, .. } => Epoch(snapshot.committed_epoch()),
ReadSnapshot::Other(epoch) => *epoch,
};
InlineNowProcTime::new(epoch)
}

/// Returns true if this snapshot is a barrier read.
Expand Down Expand Up @@ -111,12 +101,16 @@ pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
impl PinnedSnapshot {
fn batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch {
let epoch = if is_barrier_read {
batch_query_epoch::Epoch::Current(self.value.current_epoch)
batch_query_epoch::Epoch::Current(u64::MAX)
} else {
batch_query_epoch::Epoch::Committed(self.value.committed_epoch)
};
BatchQueryEpoch { epoch: Some(epoch) }
}

pub fn committed_epoch(&self) -> u64 {
self.value.committed_epoch
}
}

impl Drop for PinnedSnapshot {
Expand All @@ -129,7 +123,6 @@ impl Drop for PinnedSnapshot {
fn invalid_snapshot() -> PbHummockSnapshot {
PbHummockSnapshot {
committed_epoch: INVALID_EPOCH,
current_epoch: INVALID_EPOCH,
}
}

Expand Down Expand Up @@ -191,10 +184,6 @@ impl HummockSnapshotManager {
old_snapshot.value.committed_epoch,
snapshot.committed_epoch,
),
current_epoch: std::cmp::max(
old_snapshot.value.current_epoch,
snapshot.current_epoch,
),
};

if old_snapshot.value == snapshot {
Expand Down Expand Up @@ -251,7 +240,7 @@ impl Operation {
match self {
Operation::Pin(s) | Operation::Unpin(s) => s,
}
.current_epoch
.committed_epoch
== INVALID_EPOCH
}
}
Expand All @@ -266,8 +255,7 @@ impl Eq for SnapshotKey {}

impl Ord for SnapshotKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.0.committed_epoch, self.0.current_epoch)
.cmp(&(other.0.committed_epoch, other.0.current_epoch))
self.0.committed_epoch.cmp(&other.0.committed_epoch)
}
}

Expand Down
Loading

0 comments on commit 3d68bd9

Please sign in to comment.