Skip to content

Commit

Permalink
feat: support embedded connector (#542)
Browse files Browse the repository at this point in the history
* fix spell check

Signed-off-by: arkbriar <[email protected]>

* feat: support embedded connector

Signed-off-by: arkbriar <[email protected]>

---------

Signed-off-by: arkbriar <[email protected]>
  • Loading branch information
arkbriar authored Nov 7, 2023
1 parent 7b5c9cc commit 0c9f978
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 16 deletions.
7 changes: 7 additions & 0 deletions apis/risingwave/v1alpha1/risingwave_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ type RisingWaveSpec struct {
// +kubebuilder:default=false
EnableStandaloneMode *bool `json:"enableStandaloneMode,omitempty"`

// Flag to control whether to use the embedded connector (recommended). If embedded connector is enabled,
// the dedicated connectors won't be deployed and used anymore and the corresponding fields will be ignored.
// The dedicated connector will be deprecated soon because of its error proneness.
// +optional
// +kubebuilder:default=false
EnableEmbeddedConnector *bool `json:"enableEmbeddedConnector,omitempty"`

// Image for RisingWave component.
Image string `json:"image"`

Expand Down
5 changes: 5 additions & 0 deletions apis/risingwave/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40324,6 +40324,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
enableEmbeddedConnector:
default: false
description: Flag to control whether to use the embedded connector
(recommended). If embedded connector is enabled, the dedicated connectors
won't be deployed and used anymore and the corresponding fields
will be ignored. The dedicated connector will be deprecated soon
because of its error proneness.
type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
Expand Down
8 changes: 8 additions & 0 deletions config/risingwave-operator-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40341,6 +40341,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
enableEmbeddedConnector:
default: false
description: Flag to control whether to use the embedded connector
(recommended). If embedded connector is enabled, the dedicated connectors
won't be deployed and used anymore and the corresponding fields
will be ignored. The dedicated connector will be deprecated soon
because of its error proneness.
type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
Expand Down
8 changes: 8 additions & 0 deletions config/risingwave-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40341,6 +40341,14 @@ spec:
even if it's set to true, the controller will determine if it can
create the resource by checking if the CRDs are installed.
type: boolean
enableEmbeddedConnector:
default: false
description: Flag to control whether to use the embedded connector
(recommended). If embedded connector is enabled, the dedicated connectors
won't be deployed and used anymore and the corresponding fields
will be ignored. The dedicated connector will be deprecated soon
because of its error proneness.
type: boolean
enableFullKubernetesAddr:
default: false
description: Flag to indicate if full kubernetes address should be
Expand Down
28 changes: 28 additions & 0 deletions docs/general/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,20 @@ spec.components will be ignored. Standalone mode can be turned on/off dynamicall
</tr>
<tr>
<td>
<code>enableEmbeddedConnector</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Flag to control whether to use the embedded connector (recommended). If embedded connector is enabled,
the dedicated connectors won&rsquo;t be deployed and used anymore and the corresponding fields will be ignored.
The dedicated connector will be deprecated soon because of its error proneness.</p>
</td>
</tr>
<tr>
<td>
<code>image</code><br/>
<em>
string
Expand Down Expand Up @@ -4107,6 +4121,20 @@ spec.components will be ignored. Standalone mode can be turned on/off dynamicall
</tr>
<tr>
<td>
<code>enableEmbeddedConnector</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Flag to control whether to use the embedded connector (recommended). If embedded connector is enabled,
the dedicated connectors won&rsquo;t be deployed and used anymore and the corresponding fields will be ignored.
The dedicated connector will be deprecated soon because of its error proneness.</p>
</td>
</tr>
<tr>
<td>
<code>image</code><br/>
<em>
string
Expand Down
15 changes: 9 additions & 6 deletions pkg/controller/risingwave_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,24 +313,27 @@ func (c *RisingWaveController) reactiveWorkflow(risingwaveManger *object.RisingW
mgr.SyncFrontendDeployments(),
ctrlkit.If(c.openKruiseAvailable, mgr.SyncFrontendCloneSets()),
),
ctrlkit.ParallelJoin(
mgr.SyncConnectorService(),
mgr.SyncConnectorDeployments(),
ctrlkit.If(c.openKruiseAvailable, mgr.SyncConnectorCloneSets()),
// Sync only when embedded connector is disabled.
ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(),
ctrlkit.ParallelJoin(
mgr.SyncConnectorService(),
mgr.SyncConnectorDeployments(),
ctrlkit.If(c.openKruiseAvailable, mgr.SyncConnectorCloneSets()),
),
),
)
otherOpenKruiseComponentsReadyBarrier := ctrlkit.ParallelJoin(
mgr.WaitBeforeFrontendCloneSetsReady(),
mgr.WaitBeforeComputeAdvancedStatefulSetsReady(),
mgr.WaitBeforeCompactorCloneSetsReady(),
mgr.WaitBeforeConnectorCloneSetsReady(),
ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(), mgr.WaitBeforeConnectorCloneSetsReady()),
)

otherComponentsReadyBarrier := ctrlkit.Join(
mgr.WaitBeforeFrontendDeploymentsReady(),
mgr.WaitBeforeComputeStatefulSetsReady(),
mgr.WaitBeforeCompactorDeploymentsReady(),
mgr.WaitBeforeConnectorDeploymentsReady(),
ctrlkit.If(!risingwaveManger.IsEmbeddedConnectorEnabled(), mgr.WaitBeforeConnectorDeploymentsReady()),
ctrlkit.If(c.openKruiseAvailable, otherOpenKruiseComponentsReadyBarrier),
)

Expand Down
18 changes: 12 additions & 6 deletions pkg/factory/risingwave_object_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,13 @@ func (f *RisingWaveObjectFactory) envsForMetaArgs() []corev1.EnvVar {
Name: envs.RWPrometheusHost,
Value: fmt.Sprintf("0.0.0.0:%d", consts.MetaMetricsPort),
},
{
}

if !ptr.Deref(f.risingwave.Spec.EnableEmbeddedConnector, false) {
envVars = append(envVars, corev1.EnvVar{
Name: envs.RWConnectorRPCEndPoint,
Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
},
})
}

envVars = append(envVars, f.coreEnvsForMeta()...)
Expand Down Expand Up @@ -462,16 +465,19 @@ func (f *RisingWaveObjectFactory) envsForComputeArgs(cpuLimit int64, memLimit in
Name: envs.RWMetaAddrLegacy,
Value: fmt.Sprintf("load-balance+http://%s:%d", f.componentAddr(consts.ComponentMeta, ""), consts.MetaServicePort),
},
{
Name: envs.RWConnectorRPCEndPoint,
Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
},
{
Name: envs.RWPrometheusListenerAddr,
Value: fmt.Sprintf("0.0.0.0:%d", consts.ComputeMetricsPort),
},
}

if !ptr.Deref(f.risingwave.Spec.EnableEmbeddedConnector, false) {
envVars = append(envVars, corev1.EnvVar{
Name: envs.RWConnectorRPCEndPoint,
Value: fmt.Sprintf("%s:%d", f.componentAddr(consts.ComponentConnector, ""), consts.ConnectorServicePort),
})
}

if cpuLimit != 0 {
envVars = append(envVars, corev1.EnvVar{
Name: envs.RWParallelism,
Expand Down
10 changes: 6 additions & 4 deletions pkg/manager/risingwave_controller_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
computeStatefulSets []kruiseappsv1beta1.StatefulSet, compactorCloneSets []kruiseappsv1alpha1.CloneSet, connectorCloneSets []kruiseappsv1alpha1.CloneSet,
configConfigMap *corev1.ConfigMap) (reconcile.Result, error) {
risingwave := mgr.risingwaveManager.RisingWave()
embeddedConnectorEnabled := mgr.risingwaveManager.IsEmbeddedConnectorEnabled()

componentsSpec := &risingwave.Spec.Components

Expand Down Expand Up @@ -223,7 +224,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
component: "Service(compactor)",
},
{
cond: connectorService == nil,
cond: !embeddedConnectorEnabled && connectorService == nil,
component: "Service(connector)",
},
{
Expand All @@ -247,7 +248,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectOpenKruiseRunningStatisticsAn
component: "CloneSets(compactor)",
},
{
cond: lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
cond: !embeddedConnectorEnabled && lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
component: "CloneSets(connector)",
},
}
Expand Down Expand Up @@ -282,6 +283,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
computeStatefulSets []appsv1.StatefulSet, compactorDeployments []appsv1.Deployment, connectorDeployments []appsv1.Deployment,
configConfigMap *corev1.ConfigMap) (reconcile.Result, error) {
risingwave := mgr.risingwaveManager.RisingWave()
embeddedConnectorEnabled := mgr.risingwaveManager.IsEmbeddedConnectorEnabled()

componentsSpec := &risingwave.Spec.Components

Expand Down Expand Up @@ -344,7 +346,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
component: "Service(compactor)",
},
{
cond: connectorService == nil,
cond: !embeddedConnectorEnabled && connectorService == nil,
component: "Service(connector)",
},
{
Expand All @@ -368,7 +370,7 @@ func (mgr *risingWaveControllerManagerImpl) CollectRunningStatisticsAndSyncStatu
component: "Deployments(compactor)",
},
{
cond: lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
cond: !embeddedConnectorEnabled && lo.ContainsBy(componentReplicas.Connector.Groups, isGroupMissing),
component: "Deployments(connector)",
},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/object/risingwave_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ func (r *RisingWaveReader) IsStandaloneModeEnabled() bool {
return ptr.Deref(r.risingwave.Spec.EnableStandaloneMode, false)
}

// IsEmbeddedConnectorEnabled returns true when the embedded connector is enabled.
func (r *RisingWaveReader) IsEmbeddedConnectorEnabled() bool {
return ptr.Deref(r.risingwave.Spec.EnableEmbeddedConnector, false)
}

// KeepLock resets the current scale views record in the status with the given array.
func (mgr *RisingWaveManager) KeepLock(aliveScaleView []risingwavev1alpha1.RisingWaveScaleViewLock) {
mgr.mu.Lock()
Expand Down

0 comments on commit 0c9f978

Please sign in to comment.