Skip to content

Commit

Permalink
chore: Remove TableInfo::table_name and TableInfo::id. Remove par…
Browse files Browse the repository at this point in the history
…ameter of `Connector::get_tables`. (#1145)

* chore: Remove `TableInfo::name`

* chore: Remove unused parameter of `Connector::get_tables`

* chore: Remove `TableInfo::id`

* chore: Rename `TableInfo::table_name` to `TableInfo::name`

* chore: Fix snowflake compilation
  • Loading branch information
chubei authored Mar 6, 2023
1 parent d5be059 commit 2403b55
Show file tree
Hide file tree
Showing 49 changed files with 329 additions and 404 deletions.
2 changes: 1 addition & 1 deletion dozer-admin/src/services/connection_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ConnectionService {
) -> Result<Vec<dozer_orchestrator::TableInfo>, ErrorResponse> {
let res = thread::spawn(|| {
let connector = get_connector(connection).map_err(|err| err.to_string())?;
connector.get_tables(None).map_err(|err| err.to_string())
connector.get_tables().map_err(|err| err.to_string())
})
.join()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/dag_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn contains_port<T>(
}
NodeKind::Source(s) => {
if direction == PortDirection::Output {
s.get_output_ports()?.iter().any(|e| e.handle == port)
s.get_output_ports().iter().any(|e| e.handle == port)
} else {
false
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ fn populate_schemas<T: Clone>(

match &node.kind {
NodeKind::Source(source) => {
let ports = source.get_output_ports()?;
let ports = source.get_output_ports();

for edge in dag.graph().edges(node_index) {
let port = find_output_port_def(&ports, edge);
Expand Down
4 changes: 0 additions & 4 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ pub enum ExecutionError {
InvalidDatabase,
#[error("Field not found at position {0}")]
FieldNotFound(String),
#[error("Port not found in source for schema_id: {0}.")]
PortNotFound(String),
#[error("Replication type not found")]
ReplicationTypeNotFound,
#[error("Record not found")]
RecordNotFound(),
#[error("Node {node} has incompatible {typ:?} schemas: {source}")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl OutputPortDef {

pub trait SourceFactory<T>: Send + Sync + Debug {
fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), ExecutionError>;
fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError>;
fn get_output_ports(&self) -> Vec<OutputPortDef>;
fn build(
&self,
output_schemas: HashMap<PortHandle, Schema>,
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SourceFactory<NoneContext> for NoneSourceFactory {
todo!()
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
fn get_output_ports(&self) -> Vec<OutputPortDef> {
todo!()
}

Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/tests/dag_base_create_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl SourceFactory<NoneContext> for CreateErrSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)])
)]
}

fn build(
Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,11 @@ impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
GENERATOR_SOURCE_OUTPUT_PORT,
OutputPortType::Stateless,
)])
)]
}

fn build(
Expand Down
7 changes: 3 additions & 4 deletions dozer-core/src/tests/dag_ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ impl SourceFactory<NoneContext> for DynPortsSourceFactory {
todo!()
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(self
.output_ports
fn get_output_ports(&self) -> Vec<OutputPortDef> {
self.output_ports
.iter()
.map(|p| OutputPortDef::new(*p, OutputPortType::Stateless))
.collect())
.collect()
}

fn build(
Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/tests/dag_recordreader_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl SourceFactory<NoneContext> for GeneratorSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
GENERATOR_SOURCE_OUTPUT_PORT,
if self.stateful {
OutputPortType::StatefulWithPrimaryKeyLookup {
Expand All @@ -83,7 +83,7 @@ impl SourceFactory<NoneContext> for GeneratorSourceFactory {
} else {
OutputPortType::Stateless
},
)])
)]
}

fn build(
Expand Down
12 changes: 6 additions & 6 deletions dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ impl SourceFactory<NoneContext> for TestUsersSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)])
)]
}

fn build(
Expand Down Expand Up @@ -110,11 +110,11 @@ impl SourceFactory<NoneContext> for TestCountriesSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)])
)]
}

fn build(
Expand Down
24 changes: 12 additions & 12 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ impl SourceFactory<NoneContext> for GeneratorSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
GENERATOR_SOURCE_OUTPUT_PORT,
if self.stateful {
OutputPortType::StatefulWithPrimaryKeyLookup {
Expand All @@ -75,7 +75,7 @@ impl SourceFactory<NoneContext> for GeneratorSourceFactory {
} else {
OutputPortType::Stateless
},
)])
)]
}

fn build(
Expand Down Expand Up @@ -188,8 +188,8 @@ impl SourceFactory<NoneContext> for DualPortGeneratorSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![
OutputPortDef::new(
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
if self.stateful {
Expand All @@ -212,7 +212,7 @@ impl SourceFactory<NoneContext> for DualPortGeneratorSourceFactory {
OutputPortType::Stateless
},
),
])
]
}

fn build(
Expand Down Expand Up @@ -335,15 +335,15 @@ impl SourceFactory<NoneContext> for NoPkGeneratorSourceFactory {
))
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
GENERATOR_SOURCE_OUTPUT_PORT,
if self.stateful {
OutputPortType::AutogenRowKeyLookup
} else {
OutputPortType::Stateless
},
)])
)]
}

fn build(
Expand Down Expand Up @@ -417,11 +417,11 @@ impl SourceFactory<NoneContext> for ConnectivityTestSourceFactory {
unimplemented!("This struct is for connectivity test, only output ports are defined")
}

fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
Ok(vec![OutputPortDef::new(
fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)])
)]
}

fn build(
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/benches/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn get_connection_iterator(config: TestConfig) -> IngestionIterator {
std::thread::spawn(move || {
let grpc_connector = dozer_ingestion::connectors::get_connector(config.connection).unwrap();

let mut tables = grpc_connector.get_tables(None).unwrap();
let mut tables = grpc_connector.get_tables().unwrap();
if let Some(tables_filter) = config.tables_filter {
tables.retain(|t| tables_filter.contains(&t.name));
}
Expand Down
2 changes: 0 additions & 2 deletions dozer-ingestion/examples/postgres/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ fn main() {
let (ingestor, mut iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let tables = vec![TableInfo {
name: "users".to_string(),
table_name: "users".to_string(),
id: 0,
columns: None,
}];
let postgres_config = PostgresConfig {
Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/src/connectors/ethereum/log/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Connector for EthLogConnector {
let schemas = if let Some(tables) = tables {
schemas
.iter()
.filter(|s| tables.iter().any(|t| t.table_name == s.name))
.filter(|s| tables.iter().any(|t| t.name == s.name))
.cloned()
.collect()
} else {
Expand Down Expand Up @@ -191,8 +191,8 @@ impl Connector for EthLogConnector {
Ok(HashMap::new())
}

fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default(tables)
fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default()
}

fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ConnectorError> {
Expand Down
7 changes: 2 additions & 5 deletions dozer-ingestion/src/connectors/ethereum/log/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub fn decode_event(
.to_owned();

let table_name = get_table_name(contract_tuple, &event.name);
let is_table_required = tables.iter().any(|t| t.table_name == table_name);
let is_table_required = tables.iter().any(|t| t.name == table_name);
if is_table_required {
let parsed_event = event.parse_log(RawLog {
topics: log.topics,
Expand Down Expand Up @@ -172,10 +172,7 @@ pub fn map_abitype_to_field(f: web3::ethabi::Token) -> Field {
}
pub fn map_log_to_event(log: Log, details: Arc<EthDetails>) -> Option<Operation> {
// Check if table is requested
let is_table_required = details
.tables
.iter()
.any(|t| t.table_name == ETH_LOGS_TABLE);
let is_table_required = details.tables.iter().any(|t| t.name == ETH_LOGS_TABLE);

if !is_table_required {
None
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/ethereum/trace/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Connector for EthTraceConnector {
Ok(HashMap::new())
}

fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default(tables)
fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default()
}

fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ConnectorError> {
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/grpc/adapter/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl IngestAdapter for ArrowAdapter {
serde_json::from_str(schemas_str).map_err(ConnectorError::map_serialization_error)?;
let mut schemas = vec![];

for (id, grpc_schema) in grpc_schemas.iter().enumerate() {
for (id, grpc_schema) in grpc_schemas.into_iter().enumerate() {
let mut schema = arrow_types::from_arrow::map_schema_to_dozer(&grpc_schema.schema)
.map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
schema.identifier = Some(SchemaIdentifier {
Expand All @@ -53,7 +53,7 @@ impl IngestAdapter for ArrowAdapter {
});

schemas.push(SourceSchema {
name: grpc_schema.name.clone(),
name: grpc_schema.name,
schema,
replication_type: grpc_schema.replication_type.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/grpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ where
Ok(results)
}

fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default(tables)
fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default()
}

fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ConnectorError> {
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/grpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn ingest_grpc(
})
.unwrap();

let tables = grpc_connector.get_tables(None).unwrap();
let tables = grpc_connector.get_tables().unwrap();
grpc_connector.start(None, &ingestor, tables).unwrap();
});

Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/src/connectors/kafka/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Connector for KafkaConnector {
) -> Result<(), ConnectorError> {
let topic = tables
.get(0)
.map_or(Err(TopicNotDefined), |table| Ok(&table.table_name))?;
.map_or(Err(TopicNotDefined), |table| Ok(&table.name))?;

let broker = self.config.broker.to_owned();
Runtime::new()
Expand All @@ -60,8 +60,8 @@ impl Connector for KafkaConnector {
todo!()
}

fn get_tables(&self, tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default(tables)
fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
self.get_tables_default()
}

fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ConnectorError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl NoSchemaRegistry {
table_names.map_or(Ok(vec![]), |tables| {
tables.get(0).map_or(Ok(vec![]), |table| {
let mut con = Consumer::from_hosts(vec![config.broker.clone()])
.with_topic(table.table_name.clone())
.with_topic(table.name.clone())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
Expand Down Expand Up @@ -52,7 +52,7 @@ impl NoSchemaRegistry {
})?;

schemas.push(SourceSchema::new(
table.table_name.clone(),
table.name.clone(),
mapped_schema,
ReplicationChangesTrackingType::FullChanges,
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ impl SchemaRegistry {
let sr_settings = SrSettings::new(config.schema_registry_url.unwrap());
table_names.map_or(Ok(vec![]), |tables| {
tables.get(0).map_or(Ok(vec![]), |table| {
let key_result =
SchemaRegistry::fetch_struct(&sr_settings, &table.table_name, true)?;
let schema_result =
SchemaRegistry::fetch_struct(&sr_settings, &table.table_name, false)?;
let key_result = SchemaRegistry::fetch_struct(&sr_settings, &table.name, true)?;
let schema_result = SchemaRegistry::fetch_struct(&sr_settings, &table.name, false)?;

let pk_fields = key_result.fields.map_or(vec![], |fields| {
fields
Expand Down Expand Up @@ -142,7 +140,7 @@ impl SchemaRegistry {
};

schema_data = Some(Ok(vec![SourceSchema::new(
table.table_name.clone(),
table.name.clone(),
schema,
ReplicationChangesTrackingType::FullChanges,
)]));
Expand Down
Loading

0 comments on commit 2403b55

Please sign in to comment.