Skip to content

Commit

Permalink
feat(source): support temporary source (#18174)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Aug 23, 2024
1 parent ec60744 commit 9587945
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 52 deletions.
82 changes: 82 additions & 0 deletions e2e_test/source/basic/temporary_kafka_batch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

query IT rowsort
select * from s1
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00'
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
----
1 1
2 22
3 333
4 4444

statement error expected format
select * from s1 where _rw_kafka_timestamp > 'abc'

statement error out of range
select * from s1 where _rw_kafka_timestamp < TO_TIMESTAMP(2147483647 + 1)

query IT
select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
----

query B
select _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00' from s1
----
t
t
t
t

query B
select _rw_kafka_timestamp < now() from s1
----
t
t
t
t

query B
select _rw_kafka_timestamp < now() - interval '1 day' from s1
----
f
f
f
f

query IT rowsort
select * from s1 limit 2
----
1 1
2 22

statement ok
drop source s1
6 changes: 5 additions & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::{CatalogResult, TableId, ViewId};
use crate::error::ErrorCode;
use crate::expr::ExprImpl;
use crate::session::{AuthContext, SessionImpl};
use crate::session::{AuthContext, SessionImpl, TemporarySourceManager};

pub type ShareId = usize;

Expand Down Expand Up @@ -127,6 +127,9 @@ pub struct Binder {

/// The sql udf context that will be used during binding phase
udf_context: UdfContext,

/// The temporary sources that will be used during binding phase
temporary_source_manager: TemporarySourceManager,
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -360,6 +363,7 @@ impl Binder {
included_relations: HashSet::new(),
param_types: ParameterTypes::new(param_types),
udf_context: UdfContext::new(),
temporary_source_manager: session.temporary_source_manager(),
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl Binder {
table_name
);
}
} else if let Some(source_catalog) =
self.temporary_source_manager.get_source(table_name)
// don't care about the database and schema
{
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else if let Ok((table_catalog, schema_name)) = self
.catalog
.get_created_table_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -163,7 +168,13 @@ impl Binder {
if let Ok(schema) =
self.catalog.get_schema_by_name(&self.db_name, schema_name)
{
if let Some(table_catalog) =
if let Some(source_catalog) =
self.temporary_source_manager.get_source(table_name)
// don't care about the database and schema
{
return Ok(self
.resolve_source_relation(&source_catalog.clone(), as_of));
} else if let Some(table_catalog) =
schema.get_created_table_by_name(table_name)
{
return self.resolve_table_relation(
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use thiserror_ext::AsReport;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::catalog::{CatalogError, DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
Expand Down Expand Up @@ -1657,6 +1657,15 @@ pub async fn handle_create_source(
)
.await?;

// If it is a temporary source, put it into SessionImpl.
if stmt.temporary {
if session.get_temporary_source(&source_catalog.name).is_some() {
return Err(CatalogError::Duplicated("source", source_catalog.name.clone()).into());
}
session.create_temporary_source(source_catalog);
return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
}

let source = source_catalog.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/handler/drop_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub async fn handle_drop_source(
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

// Check if temporary source exists, if yes, drop it.
if let Some(_source) = session.get_temporary_source(&source_name) {
session.drop_temporary_source(&source_name);
return Ok(PgResponse::empty_result(StatementType::DROP_SOURCE));
}

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (source, schema_name) = {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ pub async fn handle_show_object(
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_source()
.map(|t| t.name.clone())
.chain(session.temporary_source_manager().keys())
.collect(),
ShowObject::Sink { schema } => catalog_reader
.read_guard()
Expand Down
64 changes: 64 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWrite
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::root_catalog::Catalog;
use crate::catalog::secret_catalog::SecretCatalog;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::subscription_catalog::SubscriptionCatalog;
use crate::catalog::{
check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
Expand Down Expand Up @@ -652,6 +653,46 @@ pub struct SessionImpl {
last_idle_instant: Arc<Mutex<Option<Instant>>>,

cursor_manager: Arc<CursorManager>,

/// temporary sources for the current session
temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
}

/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
/// Temporary sources are automatically dropped at the end of a session
/// Temporary sources are expected to be selected by batch queries, not streaming queries.
/// Temporary sources currently are only used by cloud portal to preview the data during table and
/// source creation, so it is a internal feature and not exposed to users.
/// The current PR supports temporary source with minimum effort,
/// so we don't care about the database name and schema name, but only care about the source name.
/// Temporary sources can only be shown via `show sources` command but not other system tables.
#[derive(Default, Clone)]
pub struct TemporarySourceManager {
sources: HashMap<String, SourceCatalog>,
}

impl TemporarySourceManager {
pub fn new() -> Self {
Self {
sources: HashMap::new(),
}
}

pub fn create_source(&mut self, name: String, source: SourceCatalog) {
self.sources.insert(name, source);
}

pub fn drop_source(&mut self, name: &str) {
self.sources.remove(name);
}

pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
self.sources.get(name)
}

pub fn keys(&self) -> Vec<String> {
self.sources.keys().cloned().collect()
}
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -693,6 +734,7 @@ impl SessionImpl {
exec_context: Mutex::new(None),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(CursorManager::default()),
temporary_source_manager: Default::default(),
}
}

Expand Down Expand Up @@ -720,6 +762,7 @@ impl SessionImpl {
.into(),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(CursorManager::default()),
temporary_source_manager: Default::default(),
}
}

Expand Down Expand Up @@ -1130,6 +1173,27 @@ impl SessionImpl {
Duration::from_secs(self.config().statement_timeout() as u64)
}
}

pub fn create_temporary_source(&self, source: SourceCatalog) {
self.temporary_source_manager
.lock()
.create_source(source.name.to_string(), source);
}

pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
self.temporary_source_manager
.lock()
.get_source(name)
.cloned()
}

pub fn drop_temporary_source(&self, name: &str) {
self.temporary_source_manager.lock().drop_source(name);
}

pub fn temporary_source_manager(&self) -> TemporarySourceManager {
self.temporary_source_manager.lock().clone()
}
}

pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
Expand Down
41 changes: 2 additions & 39 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::ast::{
display_comma_separated, display_separated, ColumnDef, ObjectName, SqlOption, TableConstraint,
};
use crate::keywords::Keyword;
use crate::parser::{IncludeOption, IsOptional, Parser, UPSTREAM_SOURCE_KEY};
use crate::parser::{IncludeOption, IsOptional, Parser};
use crate::parser_err;
use crate::parser_v2::literal_u32;
use crate::tokenizer::Token;
Expand Down Expand Up @@ -82,6 +82,7 @@ macro_rules! impl_fmt_display {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CreateSourceStatement {
pub temporary: bool,
pub if_not_exists: bool,
pub columns: Vec<ColumnDef>,
// The wildchar position in columns defined in sql. Only exist when using external schema.
Expand Down Expand Up @@ -399,44 +400,6 @@ impl fmt::Display for ConnectorSchema {
}
}

impl ParseTo for CreateSourceStatement {
fn parse_to(p: &mut Parser<'_>) -> PResult<Self> {
impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
impl_parse_to!(source_name: ObjectName, p);

// parse columns
let (columns, constraints, source_watermarks, wildcard_idx) =
p.parse_columns_with_watermark()?;
let include_options = p.parse_include_options()?;

let with_options = p.parse_with_properties()?;
let option = with_options
.iter()
.find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY);
let connector: String = option.map(|opt| opt.value.to_string()).unwrap_or_default();
let cdc_source_job = connector.contains("-cdc");
if cdc_source_job && (!columns.is_empty() || !constraints.is_empty()) {
parser_err!("CDC source cannot define columns and constraints");
}

// row format for nexmark source must be native
// default row format for datagen source is native
let source_schema = p.parse_source_schema_with_connector(&connector, cdc_source_job)?;

Ok(Self {
if_not_exists,
columns,
wildcard_idx,
constraints,
source_name,
with_properties: WithProperties(with_options),
source_schema,
source_watermarks,
include_column_options: include_options,
})
}
}

pub(super) fn fmt_create_items(
columns: &[ColumnDef],
constraints: &[TableConstraint],
Expand Down
Loading

0 comments on commit 9587945

Please sign in to comment.