Skip to content

Commit

Permalink
chore: Disable checkpointing for snowflake (#2110)
Browse files Browse the repository at this point in the history
* chore: Disable checkpointing for snowflake

* Fix clippy warnings
  • Loading branch information
karolisg authored Sep 29, 2023
1 parent 2b783b9 commit 496d4c6
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions dozer-ingestion/src/connectors/snowflake/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::connectors::snowflake::connection::client::Client;
use crate::errors::{ConnectorError, SnowflakeError};
use crate::ingestion::Ingestor;
use dozer_types::ingestion_types::IngestionMessage;
use dozer_types::node::OpIdentifier;

use crate::errors::SnowflakeStreamError::{CannotDetermineAction, UnsupportedActionInStream};
use dozer_types::types::{Field, Operation, Record};
Expand Down Expand Up @@ -98,7 +97,7 @@ impl StreamConsumer {
table_name: &str,
ingestor: &Ingestor,
table_index: usize,
iteration: u64,
_iteration: u64,
) -> Result<(), ConnectorError> {
let temp_table_name = Self::get_stream_temp_table_name(table_name, &client.get_name());
let stream_name = Self::get_stream_table_name(table_name, &client.get_name());
Expand All @@ -120,14 +119,14 @@ impl StreamConsumer {
let used_columns_for_schema = columns_length - 3;
let action_idx = used_columns_for_schema;

for (idx, result) in rows.enumerate() {
for (_idx, result) in rows.enumerate() {
let row = result?;
let op = Self::get_operation(row, action_idx, used_columns_for_schema)?;
ingestor
.blocking_handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(iteration, idx as u64)),
id: None,
})
.map_err(|_| ConnectorError::IngestorError)?;
}
Expand Down

0 comments on commit 496d4c6

Please sign in to comment.