Skip to content

Commit

Permalink
[BUG] Fix memberlist manager to only list memberlist with the provide… (
Browse files Browse the repository at this point in the history
#1956)

…d memberlist name

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- This PR fixes memberlist manager to only list memberlist with the
provided memberlist name.
 - New functionality
	 - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored Apr 1, 2024
1 parent ab05830 commit 978547c
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions rust/worker/src/memberlist/memberlist_provider.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::sync::Arc;
use std::{fmt::Debug, sync::RwLock};

use super::config::{CustomResourceMemberlistProviderConfig, MemberlistProviderConfig};
use crate::system::{Receiver, Sender};
use super::config::MemberlistProviderConfig;
use crate::system::Receiver;
use crate::{
config::{Configurable, WorkerConfig},
errors::{ChromaError, ErrorCodes},
system::{Component, ComponentContext, Handler, StreamHandler},
};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::events::v1::Event;
use futures::StreamExt;
use kube::runtime::watcher::Config;
use kube::{
api::Api,
config,
runtime::{watcher, watcher::Error as WatchError, WatchStreamExt},
runtime::{watcher, WatchStreamExt},
Client, CustomResource,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_util::sync::CancellationToken;

/* =========== Basic Types ============== */
pub(crate) type Memberlist = Vec<String>;
Expand Down Expand Up @@ -108,8 +105,8 @@ impl Configurable for CustomResourceMemberlistProvider {
let c: CustomResourceMemberlistProvider = CustomResourceMemberlistProvider {
memberlist_name: my_config.memberlist_name.clone(),
kube_ns: worker_config.kube_namespace.clone(),
kube_client: kube_client,
memberlist_cr_client: memberlist_cr_client,
kube_client,
memberlist_cr_client,
queue_size: my_config.queue_size,
current_memberlist: RwLock::new(vec![]),
subscribers: vec![],
Expand All @@ -128,11 +125,11 @@ impl CustomResourceMemberlistProvider {
let memberlist_cr_client =
Api::<MemberListKubeResource>::namespaced(kube_client.clone(), &kube_ns);
CustomResourceMemberlistProvider {
memberlist_name: memberlist_name,
kube_ns: kube_ns,
kube_client: kube_client,
memberlist_cr_client: memberlist_cr_client,
queue_size: queue_size,
memberlist_name,
kube_ns,
kube_client,
memberlist_cr_client,
queue_size,
current_memberlist: RwLock::new(vec![]),
subscribers: vec![],
}
Expand All @@ -142,7 +139,10 @@ impl CustomResourceMemberlistProvider {
let memberlist_cr_client =
Api::<MemberListKubeResource>::namespaced(self.kube_client.clone(), &self.kube_ns);

let stream = watcher(memberlist_cr_client, watcher::Config::default())
let field_selector = format!("metadata.name={}", self.memberlist_name);
let conifg = Config::default().fields(&field_selector);

let stream = watcher(memberlist_cr_client, conifg)
.default_backoff()
.applied_objects();
let stream = stream.then(|event| async move {
Expand Down

0 comments on commit 978547c

Please sign in to comment.