diff --git a/e2e_test/source/tvf/postgres_query.slt b/e2e_test/source/tvf/postgres_query.slt index 0ee76fb3db490..1405f69c778c0 100644 --- a/e2e_test/source/tvf/postgres_query.slt +++ b/e2e_test/source/tvf/postgres_query.slt @@ -22,7 +22,7 @@ CREATE TABLE test ( system ok PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " -INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00+02', 'text', 'varchar', '1 day', '{}', '\\x01'; +INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00+02', 'text', 'varchar', '1 day', '{}', '\x01'; " query II diff --git a/src/batch/src/executor/postgres_query.rs b/src/batch/src/executor/postgres_query.rs index 5a8f9467cc16e..59a9e56d441bc 100644 --- a/src/batch/src/executor/postgres_query.rs +++ b/src/batch/src/executor/postgres_query.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use risingwave_common::catalog::{Field, Schema}; @@ -146,7 +147,10 @@ impl PostgresQueryExecutor { }); // TODO(kwannoel): Use pagination using CURSOR. - let rows = client.query(&self.query, &[]).await.context("postgres_query received error from remote server")?; + let rows = client + .query(&self.query, &[]) + .await + .context("postgres_query received error from remote server")?; let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024); tracing::debug!("postgres_query_executor: query executed, start deserializing rows"); // deserialize the rows