From dc363932e5614e085d5f2f9889dfe6c71c46400f Mon Sep 17 00:00:00 2001 From: Luis Moreno Date: Fri, 13 Dec 2024 13:37:27 -0400 Subject: [PATCH] chore: use sql function --- dataflows/bank-processing/dataflow.yaml | 4 ---- dataflows/car-processing/dataflow.yaml | 3 +-- dataflows/hackernews-notify/dataflow.yaml | 7 +++---- dataflows/helsinki-transit/dataflow.yaml | 3 +-- dataflows/word-counter/dataflow.yaml | 3 +-- dataflows/word-probe/dataflow.yaml | 3 +-- 6 files changed, 7 insertions(+), 16 deletions(-) diff --git a/dataflows/bank-processing/dataflow.yaml b/dataflows/bank-processing/dataflow.yaml index 7cae455..3cfacfe 100644 --- a/dataflows/bank-processing/dataflow.yaml +++ b/dataflows/bank-processing/dataflow.yaml @@ -407,16 +407,12 @@ services: - operator: filter-map run: | fn generate_overdraft_event_due_transfer(ev: DataEvent) -> Result> { - let withdrawal = match ev.operation { DataOperation::CreditAccount(_) => return Ok(None), DataOperation::DebitAccount(debit) => debit, }; - let accounts = account_balance(); - let account = accounts.sql(&format!("select * from `account-balance` where name = '{}'", ev.name))?; - let rows = account.rows()?; if !rows.next() { diff --git a/dataflows/car-processing/dataflow.yaml b/dataflows/car-processing/dataflow.yaml index 7cbcdbe..cf18014 100644 --- a/dataflows/car-processing/dataflow.yaml +++ b/dataflows/car-processing/dataflow.yaml @@ -315,8 +315,7 @@ services: - operator: filter-map run: | fn check_license_plate(car: Car) -> Result> { - let plates = licence_plates(); - let lp = plates.sql(&format!("select * from `licence-plates` where _key = '{}'", car.license))?; + let lp = sql(&format!("select * from licence_plates where _key = '{}'", car.license))?; let rows = lp.rows()?; let maker_col = lp.col("maker")?; diff --git a/dataflows/hackernews-notify/dataflow.yaml b/dataflows/hackernews-notify/dataflow.yaml index a1ab889..c2e970a 100644 --- a/dataflows/hackernews-notify/dataflow.yaml +++ b/dataflows/hackernews-notify/dataflow.yaml @@ -141,12 +141,11 @@ services: run: | fn match_words(article_data: ArticleData) -> Result, ArticleData)>> { use format_sql_query::{Table, Column, QuotedData}; - let table = notify_table(); let query = format!("SELECT * FROM `{}` WHERE {} = {}", - Table("notify-table".into()), + Table("notify_table".into()), Column("_key".into()), QuotedData(&article_data.word.to_lowercase()) ); - let emails = match table.sql(&query) { + let emails = match sql(&query) { Ok(emails) => emails, Err(e) => { println!("Error: {}", e); @@ -184,4 +183,4 @@ services: sinks: - type: topic - id: notify-event \ No newline at end of file + id: notify-event diff --git a/dataflows/helsinki-transit/dataflow.yaml b/dataflows/helsinki-transit/dataflow.yaml index 617aa8a..df6df49 100644 --- a/dataflows/helsinki-transit/dataflow.yaml +++ b/dataflows/helsinki-transit/dataflow.yaml @@ -145,8 +145,7 @@ services: flush: run: | fn compute_top_vehicle() -> Result { - let mut stat = vehicle_stat(); - let top5 = stat.sql("select * from vehicle_stat order by speed desc limit 5")?; + let top5 = sql("select * from vehicle_stat order by speed desc limit 5")?; let rows = top5.rows()?; let mut top_vehicles = vec![]; let key = top5.key()?; diff --git a/dataflows/word-counter/dataflow.yaml b/dataflows/word-counter/dataflow.yaml index e8c0f02..495a0f1 100644 --- a/dataflows/word-counter/dataflow.yaml +++ b/dataflows/word-counter/dataflow.yaml @@ -111,9 +111,8 @@ services: # Read the full state and compute the top 3 words sorted by count. run: | fn compute_most_used_words() -> Result { - let word_counts = count_per_word(); - let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?; + let top3 = sql("select * from count_per_word order by count desc limit 3")?; let rows = top3.rows()?; let columns = top3.schema(["_key","count"])?; diff --git a/dataflows/word-probe/dataflow.yaml b/dataflows/word-probe/dataflow.yaml index cee4bcc..ac5f061 100644 --- a/dataflows/word-probe/dataflow.yaml +++ b/dataflows/word-probe/dataflow.yaml @@ -84,8 +84,7 @@ services: - operator: map run: | fn query_word_count(word: String) -> Result { - let df = count_per_word(); - let count = df.sql(&format!("select * from `count-per-word` where _key = '{}'", word))?; + let count = sql(&format!("select * from count_per_word where _key = '{}'", word))?; let rows = count.rows()?; let columns = count.schema(["_key","count"])?; match &columns[..] {