diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index 100c2a03b7..6c0dc0acf6 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -237,13 +237,6 @@ impl<'env> Client<'env> { exec_first_exists(&self.pool, &query).map_or_else(Self::parse_not_exist_error, Ok) } - pub fn table_exist(&self, table_name: &String) -> Result { - let query = - format!("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name}';"); - - exec_first_exists(&self.pool, &query).map_or_else(Self::parse_not_exist_error, Ok) - } - pub fn drop_stream(&self, stream_name: &String) -> Result { let query = format!("DROP STREAM IF EXISTS {stream_name}"); diff --git a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs index 4ef1c6148c..a0838ab28a 100644 --- a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs @@ -102,16 +102,13 @@ impl StreamConsumer { ) -> Result<(), ConnectorError> { let temp_table_name = Self::get_stream_temp_table_name(table_name, &client.get_name()); let stream_name = Self::get_stream_table_name(table_name, &client.get_name()); - let temp_table_exist = client.table_exist(&temp_table_name)?; - if !temp_table_exist { - let query = format!( - "CREATE OR REPLACE TEMP TABLE {temp_table_name} AS + let query = format!( + "CREATE TEMP TABLE IF NOT EXISTS {temp_table_name} AS SELECT * FROM {stream_name} ORDER BY METADATA$ACTION;" - ); + ); - client.exec(&query)?; - } + client.exec(&query)?; let rows = client.fetch(format!("SELECT * FROM {temp_table_name};"))?; if let Some(schema) = rows.schema() {