From c8813b122c54bf5f8aec8eb2f7f2b5aafbf08268 Mon Sep 17 00:00:00 2001 From: yufansong Date: Fri, 12 Jan 2024 09:43:56 -0800 Subject: [PATCH] add additional check --- src/connector/src/source/nats/enumerator/mod.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index e1d4f96197716..c5059fdc8186c 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -21,10 +21,11 @@ use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone)] pub struct NatsSplitEnumerator { subject: String, split_id: SplitId, + client: async_nats::Client, } #[async_trait] @@ -36,13 +37,23 @@ impl SplitEnumerator for NatsSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> anyhow::Result { + let client = properties.common.build_client().await?; Ok(Self { subject: properties.common.subject, split_id: Arc::from("0"), + client, }) } async fn list_splits(&mut self) -> anyhow::Result> { + // Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash + let state = self.client.connection_state(); + if state != async_nats::connection::State::Connected { + return Err(anyhow::anyhow!( + "Nats connection status is not connected, current status is {:?}", + state + )); + } // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(),