diff --git a/apis/risingwave/v1alpha1/risingwave_types.go b/apis/risingwave/v1alpha1/risingwave_types.go
index 85074ef3..31f118b8 100644
--- a/apis/risingwave/v1alpha1/risingwave_types.go
+++ b/apis/risingwave/v1alpha1/risingwave_types.go
@@ -112,6 +112,13 @@ type RisingWaveSpec struct {
// +kubebuilder:default=false
EnableStandaloneMode *bool `json:"enableStandaloneMode,omitempty"`
+ // Flag to control whether to use the embedded connector (recommended). If embedded connector is enabled,
+ // the dedicated connectors won't be deployed and used anymore and the corresponding fields will be ignored.
+ // The dedicated connector will be deprecated soon because of its error proneness.
+ // +optional
+ // +kubebuilder:default=false
+ EnableEmbeddedConnector *bool `json:"enableEmbeddedConnector,omitempty"`
+
// Image for RisingWave component.
Image string `json:"image"`
diff --git a/apis/risingwave/v1alpha1/zz_generated.deepcopy.go b/apis/risingwave/v1alpha1/zz_generated.deepcopy.go
index ace67913..cced3a7d 100644
--- a/apis/risingwave/v1alpha1/zz_generated.deepcopy.go
+++ b/apis/risingwave/v1alpha1/zz_generated.deepcopy.go
@@ -1090,6 +1090,11 @@ func (in *RisingWaveSpec) DeepCopyInto(out *RisingWaveSpec) {
*out = new(bool)
**out = **in
}
+ if in.EnableEmbeddedConnector != nil {
+ in, out := &in.EnableEmbeddedConnector, &out.EnableEmbeddedConnector
+ *out = new(bool)
+ **out = **in
+ }
in.AdditionalFrontendServiceMetadata.DeepCopyInto(&out.AdditionalFrontendServiceMetadata)
in.MetaStore.DeepCopyInto(&out.MetaStore)
in.StateStore.DeepCopyInto(&out.StateStore)
diff --git a/config/crd/bases/risingwave.risingwavelabs.com_risingwaves.yaml b/config/crd/bases/risingwave.risingwavelabs.com_risingwaves.yaml
index bd07240e..5bc3fdd9 100644
--- a/config/crd/bases/risingwave.risingwavelabs.com_risingwaves.yaml
+++ b/config/crd/bases/risingwave.risingwavelabs.com_risingwaves.yaml
@@ -40324,6 +40324,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
+ enableEmbeddedConnector:
+ default: false
+ description: Flag to control whether to use the embedded connector
+ (recommended). If embedded connector is enabled, the dedicated connectors
+ won't be deployed and used anymore and the corresponding fields
+ will be ignored. The dedicated connector will be deprecated soon
+ because of its error proneness.
+ type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
diff --git a/config/risingwave-operator-test.yaml b/config/risingwave-operator-test.yaml
index cca575ff..6dac1e85 100644
--- a/config/risingwave-operator-test.yaml
+++ b/config/risingwave-operator-test.yaml
@@ -40341,6 +40341,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
+ enableEmbeddedConnector:
+ default: false
+ description: Flag to control whether to use the embedded connector
+ (recommended). If embedded connector is enabled, the dedicated connectors
+ won't be deployed and used anymore and the corresponding fields
+ will be ignored. The dedicated connector will be deprecated soon
+ because of its error proneness.
+ type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
diff --git a/config/risingwave-operator.yaml b/config/risingwave-operator.yaml
index 93578dbb..83a5c067 100644
--- a/config/risingwave-operator.yaml
+++ b/config/risingwave-operator.yaml
@@ -40341,6 +40341,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
+ enableEmbeddedConnector:
+ default: false
+ description: Flag to control whether to use the embedded connector
+ (recommended). If embedded connector is enabled, the dedicated connectors
+ won't be deployed and used anymore and the corresponding fields
+ will be ignored. The dedicated connector will be deprecated soon
+ because of its error proneness.
+ type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
diff --git a/docs/general/api.md b/docs/general/api.md
index e0d9a03f..47fe927b 100644
--- a/docs/general/api.md
+++ b/docs/general/api.md
@@ -582,6 +582,20 @@ spec.components will be ignored. Standalone mode can be turned on/off dynamicall
+
image
string
diff --git a/pkg/controller/risingwave_controller.go b/pkg/controller/risingwave_controller.go
index 67346d2b..38f7b4bb 100644
--- a/pkg/controller/risingwave_controller.go
+++ b/pkg/controller/risingwave_controller.go
@@ -313,24 +313,27 @@ func (c *RisingWaveController) reactiveWorkflow(risingwaveManger *object.RisingW
mgr.SyncFrontendDeployments(),
ctrlkit.If(c.openKruiseAvailable, mgr.SyncFrontendCloneSets()),
),
- ctrlkit.ParallelJoin(
- mgr.SyncConnectorService(),
- mgr.SyncConnectorDeployments(),
- ctrlkit.If(c.openKruiseAvailable, mgr.SyncConnectorCloneSets()),
+ // Sync only when embedded connector is disabled.
+ ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(),
+ ctrlkit.ParallelJoin(
+ mgr.SyncConnectorService(),
+ mgr.SyncConnectorDeployments(),
+ ctrlkit.If(c.openKruiseAvailable, mgr.SyncConnectorCloneSets()),
+ ),
),
)
otherOpenKruiseComponentsReadyBarrier := ctrlkit.ParallelJoin(
mgr.WaitBeforeFrontendCloneSetsReady(),
mgr.WaitBeforeComputeAdvancedStatefulSetsReady(),
mgr.WaitBeforeCompactorCloneSetsReady(),
- mgr.WaitBeforeConnectorCloneSetsReady(),
+ ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(), mgr.WaitBeforeConnectorCloneSetsReady()),
)
otherComponentsReadyBarrier := ctrlkit.Join(
mgr.WaitBeforeFrontendDeploymentsReady(),
mgr.WaitBeforeComputeStatefulSetsReady(),
mgr.WaitBeforeCompactorDeploymentsReady(),
- mgr.WaitBeforeConnectorDeploymentsReady(),
+ ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(), mgr.WaitBeforeConnectorDeploymentsReady()),
ctrlkit.If(c.openKruiseAvailable, otherOpenKruiseComponentsReadyBarrier),
)
diff --git a/pkg/factory/risingwave_object_factory.go b/pkg/factory/risingwave_object_factory.go
index 0c581198..7128668c 100644
--- a/pkg/factory/risingwave_object_factory.go
+++ b/pkg/factory/risingwave_object_factory.go
@@ -400,10 +400,13 @@ func (f *RisingWaveObjectFactory) envsForMetaArgs() []corev1.EnvVar {
Name: envs.RWPrometheusHost,
Value: fmt.Sprintf("0.0.0.0:%d", consts.MetaMetricsPort),
},
- {
+ }
+
+ if !ptr.Deref(f.risingwave.Spec.EnableEmbeddedConnector, false) {
+ envVars = append(envVars, corev1.EnvVar{
Name: envs.RWConnectorRPCEndPoint,
Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
- },
+ })
}
envVars = append(envVars, f.coreEnvsForMeta()...)
@@ -462,16 +465,19 @@ func (f *RisingWaveObjectFactory) envsForComputeArgs(cpuLimit int64, memLimit in
Name: envs.RWMetaAddrLegacy,
Value: fmt.Sprintf("load-balance+http://%s:%d", f.componentAddr(consts.ComponentMeta, ""), consts.MetaServicePort),
},
- {
- Name: envs.RWConnectorRPCEndPoint,
- Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
- },
{
Name: envs.RWPrometheusListenerAddr,
Value: fmt.Sprintf("0.0.0.0:%d", consts.ComputeMetricsPort),
},
}
+ if !ptr.Deref(f.risingwave.Spec.EnableEmbeddedConnector, false) {
+ envVars = append(envVars, corev1.EnvVar{
+ Name: envs.RWConnectorRPCEndPoint,
+ Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
+ })
+ }
+
if cpuLimit != 0 {
envVars = append(envVars, corev1.EnvVar{
Name: envs.RWParallelism,
diff --git a/pkg/manager/risingwave_controller_manager_impl.go b/pkg/manager/risingwave_controller_manager_impl.go
index addd7113..3126ee38 100644
--- a/pkg/manager/risingwave_controller_manager_impl.go
+++ b/pkg/manager/risingwave_controller_manager_impl.go
@@ -161,6 +161,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
computeStatefulSets []kruiseappsv1beta1.StatefulSet, compactorCloneSets []kruiseappsv1alpha1.CloneSet, connectorCloneSets []kruiseappsv1alpha1.CloneSet,
configConfigMap *corev1.ConfigMap) (reconcile.Result, error) {
risingwave := mgr.risingwaveManager.RisingWave()
+ embeddedConnectorEnabled := mgr.risingwaveManager.IsEmbeddedConnectorEnabled()
componentsSpec := &risingwave.Spec.Components
@@ -223,7 +224,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
component: "Service(compactor)",
},
{
- cond: connectorService == nil,
+ cond: !embeddedConnectorEnabled && connectorService == nil,
component: "Service(connector)",
},
{
@@ -247,7 +248,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
component: "CloneSets(compactor)",
},
{
- cond: lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
+ cond: !embeddedConnectorEnabled && lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
component: "CloneSets(connector)",
},
}
@@ -282,6 +283,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
computeStatefulSets []appsv1.StatefulSet, compactorDeployments []appsv1.Deployment, connectorDeployments []appsv1.Deployment,
configConfigMap *corev1.ConfigMap) (reconcile.Result, error) {
risingwave := mgr.risingwaveManager.RisingWave()
+ embeddedConnectorEnabled := mgr.risingwaveManager.IsEmbeddedConnectorEnabled()
componentsSpec := &risingwave.Spec.Components
@@ -344,7 +346,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
component: "Service(compactor)",
},
{
- cond: connectorService == nil,
+ cond: !embeddedConnectorEnabled && connectorService == nil,
component: "Service(connector)",
},
{
@@ -368,7 +370,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
component: "Deployments(compactor)",
},
{
- cond: lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
+ cond: !embeddedConnectorEnabled && lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
component: "Deployments(connector)",
},
}
diff --git a/pkg/object/risingwave_manager.go b/pkg/object/risingwave_manager.go
index b1d84bd3..2dc6419b 100644
--- a/pkg/object/risingwave_manager.go
+++ b/pkg/object/risingwave_manager.go
@@ -204,6 +204,11 @@ func (r *RisingWaveReader) IsStandaloneModeEnabled() bool {
return ptr.Deref(r.risingwave.Spec.EnableStandaloneMode, false)
}
+// IsEmbeddedConnectorEnabled returns true when the embedded connector is enabled.
+func (r *RisingWaveReader) IsEmbeddedConnectorEnabled() bool {
+ return ptr.Deref(r.risingwave.Spec.EnableEmbeddedConnector, false)
+}
+
// KeepLock resets the current scale views record in the status with the given array.
func (mgr *RisingWaveManager) KeepLock(aliveScaleView []risingwavev1alpha1.RisingWaveScaleViewLock) {
mgr.mu.Lock()
|