From 496d4c6cf7a2820a0f2191d6138160233bc4585e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Fri, 29 Sep 2023 09:21:51 +0300 Subject: [PATCH] chore: Disable checkpointing for snowflake (#2110) * chore: Disable checkpointing for snowflake * Fix clippy warnings --- .../src/connectors/snowflake/stream_consumer.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs index a0838ab28a..4d9ba4676f 100644 --- a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs @@ -3,7 +3,6 @@ use crate::connectors::snowflake::connection::client::Client; use crate::errors::{ConnectorError, SnowflakeError}; use crate::ingestion::Ingestor; use dozer_types::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; use crate::errors::SnowflakeStreamError::{CannotDetermineAction, UnsupportedActionInStream}; use dozer_types::types::{Field, Operation, Record}; @@ -98,7 +97,7 @@ impl StreamConsumer { table_name: &str, ingestor: &Ingestor, table_index: usize, - iteration: u64, + _iteration: u64, ) -> Result<(), ConnectorError> { let temp_table_name = Self::get_stream_temp_table_name(table_name, &client.get_name()); let stream_name = Self::get_stream_table_name(table_name, &client.get_name()); @@ -120,14 +119,14 @@ impl StreamConsumer { let used_columns_for_schema = columns_length - 3; let action_idx = used_columns_for_schema; - for (idx, result) in rows.enumerate() { + for (_idx, result) in rows.enumerate() { let row = result?; let op = Self::get_operation(row, action_idx, used_columns_for_schema)?; ingestor .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(iteration, idx as u64)), + id: None, }) .map_err(|_| ConnectorError::IngestorError)?; }