Skip to content

Commit

Permalink
fix(batch): use streaming api for postgres_query when fetching rows…
Browse files Browse the repository at this point in the history
… from upstream (#19064)
  • Loading branch information
kwannoel authored Oct 23, 2024
1 parent db24561 commit 3a4c4de
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/batch/src/executor/postgres_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,17 @@ impl PostgresQueryExecutor {
}
});

// TODO(kwannoel): Use pagination using CURSOR.
let rows = client
.query(&self.query, &[])
let params: &[&str] = &[];
let row_stream = client
.query_raw(&self.query, params)
.await
.context("postgres_query received error from remote server")?;
let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024);
tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
// deserialize the rows
for row in rows {
#[for_await]
for row in row_stream {
let row = row?;
let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
if let Some(chunk) = builder.append_one_row(owned_row) {
yield chunk;
Expand Down

0 comments on commit 3a4c4de

Please sign in to comment.