Skip to content

Commit

Permalink
Create UserTask (discover-rds) when RDS is missing IAM Authentication
Browse files Browse the repository at this point in the history
This PR changes the Discovery Service to start creating UserTasks (of
discover-rds type) when the discovered RDS database does not have IAM DB
Authentication enabled.
  • Loading branch information
marcoandredinis committed Jan 20, 2025
1 parent d1f2ee0 commit ad03e34
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 1 deletion.
45 changes: 45 additions & 0 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import (
"sync"

"github.com/gravitational/trace"
"google.golang.org/protobuf/types/known/timestamppb"

usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/usertasks"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv/discovery/common"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -99,6 +102,8 @@ func (s *Server) startDatabaseWatchers() error {
discoveryConfigsChanged[resourceGroup.discoveryConfigName] = struct{}{}

dbs = append(dbs, db)

s.collectRDSIssuesAsUserTasks(db, resourceGroup.integration, resourceGroup.discoveryConfigName)
}
mu.Lock()
newDatabases = dbs
Expand Down Expand Up @@ -134,11 +139,50 @@ func (s *Server) startDatabaseWatchers() error {
for dc := range discoveryConfigsChanged {
s.updateDiscoveryConfigStatus(dc)
}
s.upsertTasksForAWSRDSFailedEnrollments()
}
}()
return nil
}

// collectRDSIssuesAsUserTasks receives a discovered database converted into a Teleport Database resource and creates
// an User Task (discover-rds) if that Database is not properly configured to be accessed from Teleport.
// Eg, an UserTask is created if the IAM DB Authentication is not enabled
func (s *Server) collectRDSIssuesAsUserTasks(db types.Database, integration, discoveryConfigName string) {
if integration == "" || discoveryConfigName == "" || !db.IsRDS() {
return
}

if db.GetAWS().RDS.IAMAuth {
return
}

isCluster := db.GetAWS().RDS.ClusterID != ""
databaseIdentifier := db.GetAWS().RDS.InstanceID
if isCluster {
databaseIdentifier = db.GetAWS().RDS.ClusterID
}

engine := db.GetStaticLabels()[types.DiscoveryLabelEngine]

s.awsRDSTasks.addFailedEnrollment(
awsRDSTaskKey{
integration: integration,
issueType: usertasks.AutoDiscoverRDSIssueIAMAuthenticationDisabled,
accountID: db.GetAWS().AccountID,
region: db.GetAWS().Region,
},
&usertasksv1.DiscoverRDSDatabase{
DiscoveryConfig: discoveryConfigName,
DiscoveryGroup: s.DiscoveryGroup,
SyncTime: timestamppb.New(s.clock.Now()),
Name: databaseIdentifier,
IsCluster: isCluster,
Engine: engine,
},
)
}

func (s *Server) databaseWatcherIterationStarted() {
allFetchers := s.getAllDatabaseFetchers()
if len(allFetchers) == 0 {
Expand Down Expand Up @@ -169,6 +213,7 @@ func (s *Server) databaseWatcherIterationStarted() {
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
s.awsRDSTasks.reset()
}

func (s *Server) getAllDatabaseFetchers() []common.Fetcher {
Expand Down
1 change: 1 addition & 0 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ type Server struct {
awsEKSResourcesStatus awsResourcesStatus
awsEC2Tasks awsEC2Tasks
awsEKSTasks awsEKSTasks
awsRDSTasks awsRDSTasks

// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
Expand Down
57 changes: 57 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,9 @@ func TestDiscoveryDatabase(t *testing.T) {
awsRDSDBWithRole.SetAWSAssumeRole("arn:aws:iam::123456789012:role/test-role")
awsRDSDBWithRole.SetAWSExternalID("test123")

awsRDSDBWithIntegration := awsRDSDB.Copy()
rewriteCloudResource(t, awsRDSDBWithIntegration, rewriteDiscoveryLabelsParams{discoveryGroup: mainDiscoveryGroup, integration: integrationName, discoveryConfigName: discoveryConfigName})

eksAWSResource, _ := makeEKSCluster(t, "aws-eks", "us-east-1", rewriteDiscoveryLabelsParams{discoveryGroup: mainDiscoveryGroup, integration: integrationName, discoveryConfigName: discoveryConfigName})

matcherForDiscoveryConfigFn := func(t *testing.T, discoveryGroup string, m Matchers) *discoveryconfig.DiscoveryConfig {
Expand Down Expand Up @@ -2179,6 +2182,7 @@ func TestDiscoveryDatabase(t *testing.T) {
expectDatabases []types.Database
discoveryConfigs func(*testing.T) []*discoveryconfig.DiscoveryConfig
discoveryConfigStatusCheck func(*testing.T, discoveryconfig.Status)
userTasksCheck func(*testing.T, []*usertasksv1.UserTask)
wantEvents int
}{
{
Expand Down Expand Up @@ -2438,6 +2442,43 @@ func TestDiscoveryDatabase(t *testing.T) {
require.Equal(t, "DISCOVERY_CONFIG_STATE_SYNCING", s.State)
},
},
{
name: "discover-rds user task must be created when database is not configured to allow IAM DB Authentication",
discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig {
dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{
AWS: []types.AWSMatcher{{
Types: []string{types.AWSMatcherRDS},
Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}},
Regions: []string{"us-west-1"},
Integration: integrationName,
}},
})
return []*discoveryconfig.DiscoveryConfig{dc1}
},
expectDatabases: []types.Database{awsRDSDBWithIntegration},
wantEvents: 1,
userTasksCheck: func(t *testing.T, uts []*usertasksv1.UserTask) {
require.Len(t, uts, 1)
gotUserTask := uts[0]
require.Equal(t, "3ae76664-b54d-5b74-b59a-bd7bff3be053", gotUserTask.GetMetadata().GetName())
require.Equal(t, "OPEN", gotUserTask.GetSpec().GetState())
require.Equal(t, "discover-rds", gotUserTask.GetSpec().GetTaskType())
require.Equal(t, "rds-iam-auth-disabled", gotUserTask.GetSpec().GetIssueType())
require.Equal(t, "my-integration", gotUserTask.GetSpec().GetIntegration())

require.NotNil(t, gotUserTask.GetSpec().GetDiscoverRds())
require.Equal(t, "123456789012", gotUserTask.GetSpec().GetDiscoverRds().GetAccountId())
require.Equal(t, "us-west-1", gotUserTask.GetSpec().GetDiscoverRds().GetRegion())

require.Contains(t, gotUserTask.GetSpec().GetDiscoverRds().GetDatabases(), "aws-rds")
gotDatabase := gotUserTask.GetSpec().GetDiscoverRds().GetDatabases()["aws-rds"]
require.Equal(t, "my-discovery-config", gotDatabase.DiscoveryConfig)
require.Equal(t, "main", gotDatabase.DiscoveryGroup)
require.Equal(t, "postgres", gotDatabase.Engine)
require.Equal(t, "aws-rds", gotDatabase.Name)
require.False(t, gotDatabase.IsCluster)
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -2588,6 +2629,22 @@ func TestDiscoveryDatabase(t *testing.T) {

tc.discoveryConfigStatusCheck(t, dc.Status)
}
if tc.userTasksCheck != nil {
var userTasks []*usertasksv1.UserTask
var nextPage string
for {
userTasksResp, nextPageResp, err := tlsServer.Auth().ListUserTasksByIntegration(ctx, 0, nextPage, integrationName)
require.NoError(t, err)

userTasks = append(userTasks, userTasksResp...)

if nextPageResp == "" {
break
}
nextPage = nextPageResp
}
tc.userTasksCheck(t, userTasks)
}
})
}
}
Expand Down
164 changes: 163 additions & 1 deletion lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,75 @@ func (d *awsEKSTasks) addFailedEnrollment(g awsEKSTaskKey, cluster *usertasksv1.
d.issuesSyncQueue[g] = struct{}{}
}

// awsRDSTasks contains the Discover RDS User Tasks that must be reported to the user.
type awsRDSTasks struct {
mu sync.RWMutex
// databaseIssues maps the RDS Task Key to a set of databases.
// Each Task Key represents a single User Task that is going to be created for a set of RDS Databases that suffer from the same issue.
databaseIssues map[awsRDSTaskKey]*usertasksv1.DiscoverRDS
// issuesSyncQueue is used to register which groups were changed in memory but were not yet sent to the database.
// When upserting User Tasks, if the group is not in issuesSyncQueue,
// then the database already has the latest version of this particular group.
issuesSyncQueue map[awsRDSTaskKey]struct{}
}

// awsRDSTaskKey identifies a UserTask group.
type awsRDSTaskKey struct {
integration string
issueType string
accountID string
region string
}

// reset clears out any in memory issues that were recorded.
// This is used when starting a new Auto Discover RDS watcher iteration.
func (d *awsRDSTasks) reset() {
d.mu.Lock()
defer d.mu.Unlock()

d.databaseIssues = make(map[awsRDSTaskKey]*usertasksv1.DiscoverRDS)
d.issuesSyncQueue = make(map[awsRDSTaskKey]struct{})
}

// addFailedEnrollment adds an enrollment failure of a given database.
func (d *awsRDSTasks) addFailedEnrollment(g awsRDSTaskKey, database *usertasksv1.DiscoverRDSDatabase) {
// Only failures associated with an Integration are reported.
// There's no major blocking for showing non-integration User Tasks, but this keeps scope smaller.
if g.integration == "" {
return
}

if g.issueType == "" {
return
}

d.mu.Lock()
defer d.mu.Unlock()
if d.databaseIssues == nil {
d.databaseIssues = make(map[awsRDSTaskKey]*usertasksv1.DiscoverRDS)
}
if _, ok := d.databaseIssues[g]; !ok {
d.databaseIssues[g] = &usertasksv1.DiscoverRDS{
Databases: make(map[string]*usertasksv1.DiscoverRDSDatabase),
AccountId: g.accountID,
Region: g.region,
}
}
d.databaseIssues[g].Databases[database.Name] = database

if d.issuesSyncQueue == nil {
d.issuesSyncQueue = make(map[awsRDSTaskKey]struct{})
}
d.issuesSyncQueue[g] = struct{}{}
}

// acquireSemaphoreForUserTask tries to acquire a semaphore lock for this user task.
// It returns a func which must be called to release the lock.
// It also returns a context which is tied to the lease and will be canceled if the lease ends.
func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn func(), ctx context.Context, err error) {
// Use the deterministic task name as semaphore name.
semaphoreName := userTaskName
semaphoreExpiration := 5 * time.Second
semaphoreExpiration := 10 * time.Second

// AcquireSemaphoreLock will retry until the semaphore is acquired.
// This prevents multiple discovery services to write AWS resources in parallel.
Expand Down Expand Up @@ -752,3 +814,103 @@ func (s *Server) discoverEKSUserTaskAddExistingClusters(currentUserTask *usertas
}
return failedClusters
}

func (s *Server) upsertTasksForAWSRDSFailedEnrollments() {
s.awsRDSTasks.mu.Lock()
defer s.awsRDSTasks.mu.Unlock()
for g := range s.awsRDSTasks.issuesSyncQueue {
if err := s.mergeUpsertDiscoverRDSTask(g, s.awsRDSTasks.databaseIssues[g]); err != nil {
s.Log.WarnContext(s.ctx, "Failed to create discover rds user task",
"integration", g.integration,
"issue_type", g.issueType,
"aws_account_id", g.accountID,
"aws_region", g.region,
"error", err,
)
continue
}

delete(s.awsRDSTasks.issuesSyncQueue, g)
}
}

// mergeUpsertDiscoverRDSTask takes the current DiscoverRDS User Task issues stored in memory and
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverRDSTask(taskGroup awsRDSTaskKey, failedDatabases *usertasksv1.DiscoverRDS) error {
if len(failedDatabases.Databases) == 0 {
return nil
}

userTaskName := usertasks.TaskNameForDiscoverRDS(usertasks.TaskNameForDiscoverRDSParts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
AccountID: taskGroup.accountID,
Region: taskGroup.region,
})

releaseFn, ctxWithLease, err := s.acquireSemaphoreForUserTask(userTaskName)
if err != nil {
return trace.Wrap(err)
}
defer releaseFn()

// Fetch the current task because it might have instances discovered by another group of DiscoveryServices.
currentUserTask, err := s.AccessPoint.GetUserTask(ctxWithLease, userTaskName)
switch {
case trace.IsNotFound(err):
case err != nil:
return trace.Wrap(err)
default:
failedDatabases = s.discoverRDSUserTaskAddExistingDatabases(currentUserTask, failedDatabases)
}

// If the DiscoveryService is stopped, or the issue does not happen again
// the task is removed to prevent users from working on issues that are no longer happening.
taskExpiration := s.clock.Now().Add(2 * s.PollInterval)

task, err := usertasks.NewDiscoverRDSUserTask(
&usertasksv1.UserTaskSpec{
Integration: taskGroup.integration,
TaskType: usertasks.TaskTypeDiscoverRDS,
IssueType: taskGroup.issueType,
State: usertasks.TaskStateOpen,
DiscoverRds: failedDatabases,
},
usertasks.WithExpiration(taskExpiration),
)
if err != nil {
return trace.Wrap(err)
}

if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil {
return trace.Wrap(err)
}

return nil
}

// discoverRDSUserTaskAddExistingDatabases takes the UserTask stored in the cluster and merges it into the existing map of failed databases.
func (s *Server) discoverRDSUserTaskAddExistingDatabases(currentUserTask *usertasksv1.UserTask, failedDatabases *usertasksv1.DiscoverRDS) *usertasksv1.DiscoverRDS {
for existingDatabaseName, existingDatabase := range currentUserTask.Spec.DiscoverRds.Databases {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other databases exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingDatabase.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For existing clusters whose sync time is too far in the past, just drop them.
// This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list.
// It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers).
clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingDatabase.SyncTime.AsTime().Before(clusterIssueExpiration) {
continue
}

// Merge existing cluster state into in-memory object.
failedDatabases.Databases[existingDatabaseName] = existingDatabase
}
return failedDatabases
}

0 comments on commit ad03e34

Please sign in to comment.