From 0a62ac3e2311755dbc4349a776307727e419019b Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 5 Sep 2023 16:17:08 -0700 Subject: [PATCH] remove useless stream in sink --- src/connector/src/sink/nats.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 6aa4bef9893f2..76df9486bc39a 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use anyhow::anyhow; use async_nats::jetstream::context::Context; -use async_nats::jetstream::stream::Stream; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -54,7 +53,6 @@ pub struct NatsSink { pub struct NatsSinkWriter { pub config: NatsConfig, context: Context, - stream: Stream, schema: Schema, } @@ -118,15 +116,9 @@ impl NatsSinkWriter { .build_context() .await .map_err(|e| SinkError::Nats(anyhow_error!("nats sink error: {:?}", e)))?; - let stream = config - .common - .build_or_get_stream(context.clone()) - .await - .map_err(|e| SinkError::Nats(anyhow_error!("nats sink error: {:?}", e)))?; Ok::<_, SinkError>(Self { config: config.clone(), context, - stream, schema: schema.clone(), }) }