Skip to content

Commit

Permalink
Merge branch 'main' into feat-Add-SQL-CHR-function
Browse files Browse the repository at this point in the history
  • Loading branch information
mediuminvader committed Feb 28, 2024
2 parents caaa216 + 1b88cb8 commit 63bc485
Show file tree
Hide file tree
Showing 46 changed files with 450 additions and 588 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/integration/test-dozer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,3 @@ set -e

# Check if dozer version matches `DOZER_VERSION`
dozer -V | grep "$DOZER_VERSION"

# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

# Run grpc ingest test because it doesn't need docker or ETH secrets.
CARGO_TARGET_DIR=../ DOZER_BIN=dozer RUST_LOG=info "$HOME/.cargo/bin/cargo" run -p dozer-tests --bin dozer-tests -- grpc_ingest
7 changes: 5 additions & 2 deletions .github/workflows/unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ jobs:
SN_DRIVER: ${{ secrets.SN_DRIVER }}
shell: bash
run: |
cargo test -p 'dozer-ingestion-*' --lib --no-fail-fast -- --ignored
cargo test \
-p dozer-ingestion-postgres \
-p dozer-ingestion-kafka \
-p dozer-ingestion-mysql \
--lib --no-fail-fast -- --ignored
- name: Run tests
shell: bash
run: |
Expand Down
46 changes: 23 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dozer-cli"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
default-run = "dozer"
authors = ["getdozer/dozer-dev"]
Expand Down
21 changes: 11 additions & 10 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,23 +365,24 @@ fn add_sink_to_pipeline(
id: &str,
table_infos: Vec<(&OutputTableInfo, PortHandle)>,
) {
let mut connections = vec![];
let mut entry_points = vec![];
pipeline.add_sink(sink, id.to_string());

for (table_info, port) in table_infos {
match table_info {
OutputTableInfo::Original(table_info) => {
entry_points.push(PipelineEntryPoint::new(table_info.table_name.clone(), port))
pipeline.add_entry_point(
id.to_string(),
PipelineEntryPoint::new(table_info.table_name.clone(), port),
);
}
OutputTableInfo::Transformed(table_info) => {
connections.push((table_info, port));
pipeline.connect_nodes(
table_info.node.clone(),
table_info.port,
id.to_string(),
port,
);
}
}
}

pipeline.add_sink(sink, id, entry_points);

for (table_info, port) in connections {
pipeline.connect_nodes(&table_info.node, table_info.port, id, port);
}
}
2 changes: 1 addition & 1 deletion dozer-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dozer-core"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["getdozer/dozer-dev"]

Expand Down
41 changes: 15 additions & 26 deletions dozer-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,33 @@ pub struct AppPipeline {
}

impl AppPipeline {
pub fn add_processor(
&mut self,
proc: Box<dyn ProcessorFactory>,
id: &str,
entry_points: Vec<PipelineEntryPoint>,
) {
let handle = NodeHandle::new(None, id.to_string());
self.processors.push((handle.clone(), proc));
fn create_handle(id: String) -> NodeHandle {
NodeHandle::new(None, id)
}

for entry_point in entry_points {
self.entry_points.push((handle.clone(), entry_point));
}
pub fn add_processor(&mut self, proc: Box<dyn ProcessorFactory>, id: String) {
self.processors.push((Self::create_handle(id), proc));
}

pub fn add_sink(
&mut self,
sink: Box<dyn SinkFactory>,
id: &str,
entry_points: Vec<PipelineEntryPoint>,
) {
let handle = NodeHandle::new(None, id.to_string());
self.sinks.push((handle.clone(), sink));
pub fn add_sink(&mut self, sink: Box<dyn SinkFactory>, id: String) {
self.sinks.push((Self::create_handle(id), sink));
}

for entry_point in entry_points {
self.entry_points.push((handle.clone(), entry_point));
}
pub fn add_entry_point(&mut self, id: String, entry_point: PipelineEntryPoint) {
self.entry_points
.push((Self::create_handle(id), entry_point));
}

pub fn connect_nodes(
&mut self,
from: &str,
from: String,
from_port: PortHandle,
to: &str,
to: String,
to_port: PortHandle,
) {
let edge = Edge::new(
Endpoint::new(NodeHandle::new(None, from.to_string()), from_port),
Endpoint::new(NodeHandle::new(None, to.to_string()), to_port),
Endpoint::new(NodeHandle::new(None, from), from_port),
Endpoint::new(NodeHandle::new(None, to), to_port),
);
self.edges.push(edge);
}
Expand Down
Loading

0 comments on commit 63bc485

Please sign in to comment.