Skip to content

Commit

Permalink
refactor: flow replace check&better error msg (GreptimeTeam#5277)
Browse files Browse the repository at this point in the history
* chore: better error msg

* chore eof newline

* refactor: move replace check to flow worker

* chore: add ctx to insert flow failure

* chore: Update src/flow/src/adapter/flownode_impl.rs

* test: add order by for deterministic

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
  • Loading branch information
discord9 and evenyag authored Jan 3, 2025
1 parent 577d81f commit 353c823
Showing 12 changed files with 221 additions and 82 deletions.
44 changes: 3 additions & 41 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -50,10 +50,7 @@ use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
@@ -727,43 +724,6 @@ impl FlowWorkerManager {
query_ctx,
} = args;

let already_exist = {
let mut flag = false;

// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}

if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
return Ok(None);
}
}
}

let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in &source_table_ids {
@@ -877,9 +837,11 @@ impl FlowWorkerManager {
source_ids,
src_recvs: source_receivers,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
};

handle.create_flow(create_request).await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))
36 changes: 27 additions & 9 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
@@ -25,11 +25,11 @@ use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};

@@ -79,13 +79,15 @@ impl Flownode for FlowWorkerManager {
or_replace,
expire_after,
comment: Some(comment),
sql,
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
@@ -229,13 +231,29 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();

self.handle_write_request(region_id.into(), rows, &table_types)
if let Err(err) = self
.handle_write_request(region_id.into(), rows, &table_types)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(snafu::location!())(err)
})?;
{
let err = BoxedError::new(err);
let flow_ids = self
.node_context
.read()
.await
.get_flow_ids(table_id)
.into_iter()
.flatten()
.cloned()
.collect_vec();
let err = InsertIntoFlowSnafu {
region_id,
flow_ids,
}
.into_error(err);
common_telemetry::error!(err; "Failed to handle write request");
let err = to_meta_err(snafu::location!())(err);
return Err(err);
}
}
Ok(Default::default())
}
4 changes: 4 additions & 0 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
@@ -71,6 +71,10 @@ impl FlownodeContext {
query_context: Default::default(),
}
}

pub fn get_flow_ids(&self, table_id: TableId) -> Option<&BTreeSet<FlowId>> {
self.source_to_tasks.get(&table_id)
}
}

/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
26 changes: 20 additions & 6 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
@@ -247,15 +247,25 @@ impl<'s> Worker<'s> {
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exists = self.task_states.contains_key(&flow_id);
match (already_exists, create_if_not_exists) {
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
};
let already_exist = self.task_states.contains_key(&flow_id);
match (create_if_not_exists, or_replace, already_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}

let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
@@ -341,6 +351,7 @@ impl<'s> Worker<'s> {
source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
} => {
@@ -352,6 +363,7 @@ impl<'s> Worker<'s> {
&source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
);
@@ -398,6 +410,7 @@ pub enum Request {
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
@@ -547,6 +560,7 @@ mod test {
source_ids: src_ids,
src_recvs: vec![rx],
expire_after: None,
or_replace: false,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};
30 changes: 26 additions & 4 deletions src/flow/src/error.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,27 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to insert into flow: region_id={}, flow_ids={:?}",
region_id,
flow_ids
))]
InsertIntoFlow {
region_id: u64,
flow_ids: Vec<u64>,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -207,16 +228,17 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } => {
StatusCode::Internal
}
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
12 changes: 9 additions & 3 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
@@ -50,8 +50,8 @@ use tonic::{Request, Response, Status};

use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu,
ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
@@ -392,7 +392,13 @@ impl FlownodeBuilder {
.build(),
),
};
manager.create_flow(args).await?;
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
}

Ok(cnt)
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),

Affected Rows: 4

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

+-------------+---------------------+
| message | time |
@@ -46,7 +46,7 @@ ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', c

Affected Rows: 0

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

+-------------+---------------------+
| message | time |
@@ -63,15 +63,15 @@ INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),

Affected Rows: 4

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

+-------------+---------------------+
| message | time |
+-------------+---------------------+
| hello NiKo | 2020-01-03T00:00:00 |
| NiKo hello | 2020-01-03T00:00:01 |
| hello hello | 2020-01-04T00:00:00 |
| hello | 2020-01-01T00:00:00 |
| hello NiKo | 2020-01-03T00:00:00 |
| hello hello | 2020-01-04T00:00:00 |
| hello world | 2020-01-02T00:00:00 |
| world hello | 2020-01-02T00:00:01 |
+-------------+---------------------+
Original file line number Diff line number Diff line change
@@ -13,18 +13,18 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
('hello world', '2020-01-02 00:00:00'),
('world hello', '2020-01-02 00:00:01');

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', case_sensitive = 'true');

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
('NiKo hello', '2020-01-03 00:00:01'),
('hello hello', '2020-01-04 00:00:00'),
('NiKo, NiKo', '2020-01-04 00:00:01');

SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;

-- SQLNESS ARG restart=true
SHOW CREATE TABLE test;
Loading

0 comments on commit 353c823

Please sign in to comment.