Skip to content

Commit

Permalink
On premises flow was added for all cluster resources
Browse files Browse the repository at this point in the history
  • Loading branch information
testisnullus committed Nov 28, 2023
1 parent a15041a commit 9292b20
Show file tree
Hide file tree
Showing 53 changed files with 4,380 additions and 1,023 deletions.
66 changes: 65 additions & 1 deletion apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"regexp"

k8scorev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/instaclustr/operator/pkg/models"
Expand Down Expand Up @@ -62,7 +64,8 @@ type BundledOpenSearchSpec struct {

// CadenceSpec defines the desired state of Cadence
type CadenceSpec struct {
Cluster `json:",inline"`
Cluster `json:",inline"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
//+kubebuilder:validation:MinItems:=1
//+kubebuilder:validation:MaxItems:=1
DataCentres []*CadenceDataCentre `json:"dataCentres"`
Expand Down Expand Up @@ -793,3 +796,64 @@ func (o *BundledOpenSearchSpec) validate() error {

return nil
}

func (c *Cadence) GetExposePorts() []k8scorev1.ServicePort {
var exposePorts []k8scorev1.ServicePort
if !c.Spec.PrivateNetworkCluster {
exposePorts = []k8scorev1.ServicePort{
{
Name: models.CadenceTChannel,
Port: models.Port7933,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7933,
},
},
{
Name: models.CadenceWeb,
Port: models.Port8088,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port8088,
},
},
}
if c.Spec.DataCentres[0].ClientEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CadenceGRPC,
Port: models.Port7833,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7833,
},
}
exposePorts = append(exposePorts, sslPort)
}
}
return exposePorts
}

func (c *Cadence) GetHeadlessPorts() []k8scorev1.ServicePort {
headlessPorts := []k8scorev1.ServicePort{
{
Name: models.CadenceTChannel,
Port: models.Port7933,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7933,
},
},
}
if c.Spec.DataCentres[0].ClientEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CadenceGRPC,
Port: models.Port7833,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7833,
},
}
headlessPorts = append(headlessPorts, sslPort)
}
return headlessPorts
}
31 changes: 28 additions & 3 deletions apis/clusters/v1beta1/cadence_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
return err
}

if c.Spec.OnPremisesSpec != nil {
err = c.Spec.OnPremisesSpec.ValidateCreation()
if err != nil {
return err
}
if c.Spec.PrivateNetworkCluster {
err = c.Spec.OnPremisesSpec.ValidateSSHGatewayCreation()
if err != nil {
return err
}
}
}

appVersions, err := cv.API.ListAppVersions(models.CadenceAppKind)
if err != nil {
return fmt.Errorf("cannot list versions for kind: %v, err: %w",
Expand Down Expand Up @@ -169,10 +182,22 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
return fmt.Errorf("data centres field is empty")
}

//TODO: add support of multiple DCs for OnPrem clusters
if len(c.Spec.DataCentres) > 1 && c.Spec.OnPremisesSpec != nil {
return fmt.Errorf("on-premises cluster can be provisioned with only one data centre")
}

for _, dc := range c.Spec.DataCentres {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
if c.Spec.OnPremisesSpec != nil {
err := dc.DataCentre.ValidateOnPremisesCreation()
if err != nil {
return err
}
} else {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
}
}

if !c.Spec.PrivateNetworkCluster && dc.PrivateLink != nil {
Expand Down
80 changes: 80 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"strconv"

k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -56,6 +58,7 @@ type CassandraRestoreFrom struct {
// CassandraSpec defines the desired state of Cassandra
type CassandraSpec struct {
RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
Cluster `json:",inline"`
DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"`
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
Expand Down Expand Up @@ -522,3 +525,80 @@ func (c *Cassandra) SetClusterID(id string) {
func init() {
SchemeBuilder.Register(&Cassandra{}, &CassandraList{})
}

func (c *Cassandra) GetExposePorts() []k8scorev1.ServicePort {
var exposePorts []k8scorev1.ServicePort
if !c.Spec.PrivateNetworkCluster {
exposePorts = []k8scorev1.ServicePort{
{
Name: models.CassandraInterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CassandraCQL,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
{
Name: models.CassandraJMX,
Port: models.Port7199,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7199,
},
},
}
if c.Spec.DataCentres[0].ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CassandraSSL,
Port: models.Port7001,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7001,
},
}
exposePorts = append(exposePorts, sslPort)
}
}
return exposePorts
}

func (c *Cassandra) GetHeadlessPorts() []k8scorev1.ServicePort {
headlessPorts := []k8scorev1.ServicePort{
{
Name: models.CassandraInterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CassandraCQL,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
}
if c.Spec.DataCentres[0].ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CassandraSSL,
Port: models.Port7001,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7001,
},
}
headlessPorts = append(headlessPorts, sslPort)
}
return headlessPorts
}
31 changes: 28 additions & 3 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return fmt.Errorf("spark should not have more than 1 item")
}

if c.Spec.OnPremisesSpec != nil {
err = c.Spec.OnPremisesSpec.ValidateCreation()
if err != nil {
return err
}
if c.Spec.PrivateNetworkCluster {
err = c.Spec.OnPremisesSpec.ValidateSSHGatewayCreation()
if err != nil {
return err
}
}
}

appVersions, err := cv.API.ListAppVersions(models.CassandraAppKind)
if err != nil {
return fmt.Errorf("cannot list versions for kind: %v, err: %w",
Expand All @@ -113,10 +126,22 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return fmt.Errorf("data centres field is empty")
}

//TODO: add support of multiple DCs for OnPrem clusters
if len(c.Spec.DataCentres) > 1 && c.Spec.OnPremisesSpec != nil {
return fmt.Errorf("on-premises cluster can be provisioned with only one data centre")
}

for _, dc := range c.Spec.DataCentres {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
if c.Spec.OnPremisesSpec != nil {
err := dc.DataCentre.ValidateOnPremisesCreation()
if err != nil {
return err
}
} else {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
}
}

if !c.Spec.PrivateNetworkCluster && dc.PrivateIPBroadcastForDiscovery {
Expand Down
72 changes: 72 additions & 0 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package v1beta1
import (
"encoding/json"

k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/instaclustr/operator/pkg/models"
Expand Down Expand Up @@ -63,6 +65,7 @@ type KarapaceSchemaRegistry struct {
// KafkaSpec defines the desired state of Kafka
type KafkaSpec struct {
Cluster `json:",inline"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
SchemaRegistry []*SchemaRegistry `json:"schemaRegistry,omitempty"`

// ReplicationFactor to use for new topic.
Expand Down Expand Up @@ -484,3 +487,72 @@ func (k *Kafka) GetClusterID() string {
func (k *Kafka) SetClusterID(id string) {
k.Status.ID = id
}

func (k *Kafka) GetExposePorts() []k8scorev1.ServicePort {
var exposePorts []k8scorev1.ServicePort
if !k.Spec.PrivateNetworkCluster {
exposePorts = []k8scorev1.ServicePort{
{
Name: models.KafkaClient,
Port: models.Port9092,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9092,
},
},
{
Name: models.KafkaControlPlane,
Port: models.Port9093,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9093,
},
},
}
if k.Spec.ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.KafkaBroker,
Port: models.Port9094,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9094,
},
}
exposePorts = append(exposePorts, sslPort)
}
}
return exposePorts
}

func (k *Kafka) GetHeadlessPorts() []k8scorev1.ServicePort {
headlessPorts := []k8scorev1.ServicePort{
{
Name: models.KafkaClient,
Port: models.Port9092,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9092,
},
},
{
Name: models.KafkaControlPlane,
Port: models.Port9093,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9093,
},
},
}
if k.Spec.ClientToClusterEncryption {
kafkaBrokerPort := k8scorev1.ServicePort{
Name: models.KafkaBroker,
Port: models.Port9094,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9094,
},
}
headlessPorts = append(headlessPorts, kafkaBrokerPort)
}
return headlessPorts
}
Loading

0 comments on commit 9292b20

Please sign in to comment.