Skip to content

Commit

Permalink
Add kafka support (#248)
Browse files Browse the repository at this point in the history
- Add a new resource selectel_dbaas_kafka_acl_v1 to manage Kafka's ACLs
- Add a new resource selectel_dbaas_kafka_topic_v1 to manage Kafka's
Topics
- Add a new resource selectel_dbaas_kafka_datastore_v1 to manage Kafka
datastores
- Update dbaas-go version to v0.10.0
  • Loading branch information
Gogen120 authored Feb 6, 2024
1 parent 7d1587b commit 2da2d9b
Show file tree
Hide file tree
Showing 15 changed files with 1,625 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.6.6
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1
github.com/selectel/craas-go v0.3.0
github.com/selectel/dbaas-go v0.9.0
github.com/selectel/dbaas-go v0.10.0
github.com/selectel/domains-go v0.5.0
github.com/selectel/go-selvpcclient/v3 v3.1.1
github.com/selectel/mks-go v0.12.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sebdah/goldie v1.0.0/go.mod h1:jXP4hmWywNEwZzhMuv2ccnqTSFpuq8iyQhtQdkkZBH4=
github.com/selectel/craas-go v0.3.0 h1:tXiw3LNN+ZVV0wZdeBBXX6u8kMuA5PV/5W1uYqV0yXg=
github.com/selectel/craas-go v0.3.0/go.mod h1:9RAUn9PdMITP4I3GAade6v2hjB2j3lo3J2dDlG5SLYE=
github.com/selectel/dbaas-go v0.9.0 h1:IAmiyxkRtfLZg1JUdIhcsE5jpdBvsZibPCqyhB+yV30=
github.com/selectel/dbaas-go v0.9.0/go.mod h1:8D945oFzpx94v08zIb4s1bRTPCdPoNVnBu4umMYFJrQ=
github.com/selectel/dbaas-go v0.10.0 h1:iY2Q7PY9ICoWBDtni+6oWGR2uAWodER0K2zchNLIOl4=
github.com/selectel/dbaas-go v0.10.0/go.mod h1:uyPhqmcvdmKBt9yWhogoSQgWkcZ9QgVlbgaERdSdAfk=
github.com/selectel/domains-go v0.5.0 h1:RCrWY/9KHVtfdA+X8M+DDzsjILxFChhY70HnJEu1Y2U=
github.com/selectel/domains-go v0.5.0/go.mod h1:AhXhwyMSTkpEWFiBLUvzFP76W+WN+ZblwmjLJLt7y58=
github.com/selectel/go-selvpcclient/v3 v3.1.1 h1:C1q2LqqosiapoLpnGITGmysg0YCSQYDo2Gh69CioevM=
Expand Down
121 changes: 120 additions & 1 deletion selectel/dbaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
mySQLDatastoreType = "mysql"
mySQLNativeDatastoreType = "mysql_native"
redisDatastoreType = "redis"
kafkaDatastoreType = "kafka"
)

func getDBaaSClient(d *schema.ResourceData, meta interface{}) (*dbaas.API, diag.Diagnostics) {
Expand Down Expand Up @@ -122,7 +123,7 @@ func waitForDBaaSDatastoreV1ActiveState(
Refresh: dbaasDatastoreV1StateRefreshFunc(ctx, client, datastoreID),
Timeout: timeout,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
MinTimeout: 20 * time.Second,
}

_, err := stateConf.WaitForState()
Expand Down Expand Up @@ -610,3 +611,121 @@ func dbaasLogicalReplicationSlotV1DeleteStateRefreshFunc(ctx context.Context, cl
return d, strconv.Itoa(http.StatusOK), err
}
}

// Topics

func waitForDBaaSTopicV1ActiveState(
ctx context.Context, client *dbaas.API, topicID string, timeout time.Duration,
) error {
pending := []string{
string(dbaas.StatusPendingCreate),
string(dbaas.StatusPendingUpdate),
}
target := []string{
string(dbaas.StatusActive),
}

stateConf := &resource.StateChangeConf{
Pending: pending,
Target: target,
Refresh: dbaasTopicV1StateRefreshFunc(ctx, client, topicID),
Timeout: timeout,
Delay: 10 * time.Second,
MinTimeout: 20 * time.Second,
}

_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"error waiting for the topic %s to become 'ACTIVE': %s",
topicID, err)
}

return nil
}

func dbaasTopicV1StateRefreshFunc(ctx context.Context, client *dbaas.API, topicID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
d, err := client.Topic(ctx, topicID)
if err != nil {
return nil, "", err
}

return d, string(d.Status), nil
}
}

func dbaasTopicV1DeleteStateRefreshFunc(ctx context.Context, client *dbaas.API, topicID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
d, err := client.Topic(ctx, topicID)
if err != nil {
var dbaasError *dbaas.DBaaSAPIError
if errors.As(err, &dbaasError) {
return d, strconv.Itoa(dbaasError.StatusCode()), nil
}

return nil, "", err
}

return d, strconv.Itoa(http.StatusOK), err
}
}

// ACLs

func waitForDBaaSACLV1ActiveState(
ctx context.Context, client *dbaas.API, aclID string, timeout time.Duration,
) error {
pending := []string{
string(dbaas.StatusPendingCreate),
string(dbaas.StatusPendingUpdate),
}
target := []string{
string(dbaas.StatusActive),
}

stateConf := &resource.StateChangeConf{
Pending: pending,
Target: target,
Refresh: dbaasACLV1StateRefreshFunc(ctx, client, aclID),
Timeout: timeout,
Delay: 10 * time.Second,
MinTimeout: 15 * time.Second,
}

_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"error waiting for the acl %s to become 'ACTIVE': %s",
aclID, err)
}

return nil
}

func dbaasACLV1StateRefreshFunc(ctx context.Context, client *dbaas.API, aclID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
d, err := client.ACL(ctx, aclID)
if err != nil {
return nil, "", err
}

return d, string(d.Status), nil
}
}

func dbaasACLV1DeleteStateRefreshFunc(ctx context.Context, client *dbaas.API, aclID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
d, err := client.ACL(ctx, aclID)
if err != nil {
var dbaasError *dbaas.DBaaSAPIError
if errors.As(err, &dbaasError) {
return d, strconv.Itoa(dbaasError.StatusCode()), nil
}

return nil, "", err
}

return d, strconv.Itoa(http.StatusOK), err
}
}
5 changes: 5 additions & 0 deletions selectel/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

const (
objectACL = "acl"
objectFloatingIP = "floating IP"
objectKeypair = "keypair"
objectLicense = "license"
Expand All @@ -17,6 +18,7 @@ const (
objectRole = "role"
objectSubnet = "subnet"
objectToken = "token"
objectTopic = "topic"
objectUser = "user"
objectCluster = "cluster"
objectKubeConfig = "kubeconfig"
Expand Down Expand Up @@ -136,6 +138,9 @@ func Provider() *schema.Provider {
"selectel_dbaas_postgresql_extension_v1": resourceDBaaSPostgreSQLExtensionV1(),
"selectel_dbaas_prometheus_metric_token_v1": resourceDBaaSPrometheusMetricTokenV1(),
"selectel_dbaas_postgresql_logical_replication_slot_v1": resourceDBaaSPostgreSQLLogicalReplicationSlotV1(),
"selectel_dbaas_kafka_acl_v1": resourceDBaaSKafkaACLV1(),
"selectel_dbaas_kafka_datastore_v1": resourceDBaaSKafkaDatastoreV1(),
"selectel_dbaas_kafka_topic_v1": resourceDBaaSKafkaTopicV1(),
"selectel_craas_registry_v1": resourceCRaaSRegistryV1(),
"selectel_craas_token_v1": resourceCRaaSTokenV1(),
},
Expand Down
Loading

0 comments on commit 2da2d9b

Please sign in to comment.