Skip to content

Commit

Permalink
refactor: Remove duplicated code in object store connector (#2197)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei authored Oct 30, 2023
1 parent e00c51e commit adff53a
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 566 deletions.
97 changes: 37 additions & 60 deletions dozer-ingestion/object-store/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
use std::collections::HashMap;

use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
IngestionMessage, TableConfig,
};
use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage;
use dozer_ingestion_connector::dozer_types::types::FieldType;
use dozer_ingestion_connector::futures::future::join_all;
use dozer_ingestion_connector::futures::future::try_join_all;
use dozer_ingestion_connector::tokio::sync::mpsc::channel;
use dozer_ingestion_connector::tokio::task::JoinSet;
use dozer_ingestion_connector::utils::ListOrFilterColumns;
use dozer_ingestion_connector::utils::{ListOrFilterColumns, TableNotFound};
use dozer_ingestion_connector::{
async_trait, tokio, Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo,
TableToIngest,
};

use crate::adapters::DozerObjectStore;
use crate::table::ObjectStoreTable;
use crate::{schema_mapper, ObjectStoreConnectorError};

use super::connection::validator::validate_connection;
use super::csv::csv_table::CsvTable;
use super::parquet::parquet_table::ParquetTable;
use super::table_watcher::TableWatcher;

#[derive(Debug)]
pub struct ObjectStoreConnector<T: Clone> {
Expand Down Expand Up @@ -131,7 +125,6 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {
.unwrap();

let mut handles = vec![];
// let mut csv_tables: HashMap<usize, HashMap<Path, DateTime<Utc>>> = vec![];

for (table_index, table_info) in tables.iter().enumerate() {
assert!(table_info.checkpoint.is_none());
Expand All @@ -141,37 +134,38 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {
column_names: table_info.column_names.clone(),
};

let mut found = false;
for table_config in self.config.tables() {
if table_info.name == table_config.name {
match &table_config.config {
TableConfig::CSV(config) => {
let table = CsvTable::new(config.clone(), self.config.clone());
handles.push(
table
.snapshot(table_index, &table_info, sender.clone())
.await
.unwrap(),
);
}
TableConfig::Parquet(config) => {
let table = ParquetTable::new(config.clone(), self.config.clone());
handles.push(
table
.snapshot(table_index, &table_info, sender.clone())
.await?,
);
}
}
let table = ObjectStoreTable::new(
table_config.config.clone(),
self.config.clone(),
Default::default(),
);
let table_info = table_info.clone();
let sender = sender.clone();
handles.push(tokio::spawn(async move {
table
.snapshot(table_index, &table_info, sender)
.await
.unwrap()
}));
found = true;
break;
}
}
}

let updated_state = join_all(handles).await;
let mut state_hash = HashMap::new();
for (id, state) in updated_state.into_iter().flatten() {
state_hash.insert(id, state);
if !found {
return Err(TableNotFound {
schema: table_info.schema,
name: table_info.name,
}
.into());
}
}

let updated_state = try_join_all(handles).await.unwrap();

sender
.send(Ok(Some(IngestionMessage::SnapshottingDone)))
.await
Expand All @@ -187,32 +181,15 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {

for table in self.config.tables() {
if table_info.name == table.name {
let config = self.config.clone();
let table_info = table_info.clone();
let table = ObjectStoreTable::new(
table.config.clone(),
self.config.clone(),
updated_state[table_index].clone(),
);
let sender = sender.clone();
match table.config.clone() {
TableConfig::CSV(csv_config) => {
let state = state_hash.get(&table_index).unwrap().clone();

joinset.spawn(async move {
let mut csv_table = CsvTable::new(csv_config, config);
csv_table.update_state = state;
csv_table.watch(table_index, &table_info, sender).await?;
Ok::<_, ObjectStoreConnectorError>(())
});
}
TableConfig::Parquet(parquet_config) => {
let state = state_hash.get(&table_index).unwrap().clone();
joinset.spawn(async move {
let mut table = ParquetTable::new(parquet_config, config);
table.update_state = state;
table
.watch(table_index, &table_info, sender.clone())
.await?;
Ok::<_, ObjectStoreConnectorError>(())
});
}
}
joinset
.spawn(async move { table.watch(table_index, &table_info, sender).await });
break;
}
}
}
Expand Down
Loading

0 comments on commit adff53a

Please sign in to comment.