diff --git a/src/batch/src/executor/postgres_query.rs b/src/batch/src/executor/postgres_query.rs index 59a9e56d441bc..2b6524a2e45e7 100644 --- a/src/batch/src/executor/postgres_query.rs +++ b/src/batch/src/executor/postgres_query.rs @@ -146,15 +146,17 @@ impl PostgresQueryExecutor { } }); - // TODO(kwannoel): Use pagination using CURSOR. - let rows = client - .query(&self.query, &[]) + let params: &[&str] = &[]; + let row_stream = client + .query_raw(&self.query, params) .await .context("postgres_query received error from remote server")?; let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024); tracing::debug!("postgres_query_executor: query executed, start deserializing rows"); // deserialize the rows - for row in rows { + #[for_await] + for row in row_stream { + let row = row?; let owned_row = postgres_row_to_owned_row(row, &self.schema)?; if let Some(chunk) = builder.append_one_row(owned_row) { yield chunk;