diff --git a/REDSHIFTSINK.md b/REDSHIFTSINK.md index 53281156e..5565f3d6a 100644 --- a/REDSHIFTSINK.md +++ b/REDSHIFTSINK.md @@ -239,6 +239,7 @@ CREATE SCHEMA redshiftsink_operator; Please change the below two in the SQL below, before running it to create the view. View [source](https://github.com/awslabs/amazon-redshift-utils/blob/184c2ba7fd9d497027a831ca72e08fe09e79fd0b/src/AdminViews/v_get_tbl_scan_frequency.sql) 1. `AND s.userid != 100` with the user id(s) of the redshiftsink user 2. `AND s.starttime > GETDATE() - interval '3 day'` with the time window you want to consider a table is in use or not. +3. Please create the following view for all the databases in Redshift you need. Then, add the list of these databases in the operator flag `--databases=`. This is required so that the operator queries the view for all the databases. ```sql CREATE OR REPLACE VIEW redshiftsink_operator.scan_query_total AS diff --git a/cmd/redshiftsink/main.go b/cmd/redshiftsink/main.go index 36856adc0..b18422208 100644 --- a/cmd/redshiftsink/main.go +++ b/cmd/redshiftsink/main.go @@ -54,11 +54,27 @@ func init() { // +kubebuilder:scaffold:scheme } +func parseDatabase(databases string) []*string { + var dbs []*string + + if databases != "" { + supplied := strings.Split(databases, ",") + for i, _ := range supplied { + dbs = append(dbs, &supplied[i]) // use supplied dbs + } + } else { + dbs = append(dbs, nil) // use default db from config + } + + return dbs +} + func main() { rand.Seed(time.Now().UnixNano()) var enableLeaderElection, collectRedshiftMetrics bool - var batcherImage, loaderImage, secretRefName, secretRefNamespace, kafkaVersion, metricsAddr, allowedRsks, prometheusURL string + var batcherImage, loaderImage, secretRefName, secretRefNamespace string + var kafkaVersion, metricsAddr, allowedRsks, prometheusURL, databases string var redshiftMaxOpenConns, redshiftMaxIdleConns int flag.StringVar(&batcherImage, "default-batcher-image", "practodev/redshiftbatcher:v1.0.0-beta.1", "image to use for the redshiftbatcher") flag.StringVar(&loaderImage, "default-loader-image", "practodev/redshiftloader:v1.0.0-beta.1", "image to use for the redshiftloader") @@ -72,6 +88,7 @@ func main() { flag.IntVar(&redshiftMaxIdleConns, "default-redshift-max-idle-conns", 2, "the maximum number of idle connections allowed to redshift per redshiftsink resource") flag.StringVar(&allowedRsks, "allowed-rsks", "", "comma separated list of names of rsk resources to allow, if empty all rsk resources are allowed") flag.StringVar(&prometheusURL, "prometheus-url", "", "optional, giving prometheus makes the operator enable new features using time series data. Features: loader throttling, resetting offsets of 0 throughput topics.") + flag.StringVar(&databases, "databases", "", "comma separated list of all redshift databases to query for redshiftsink_operator.scan_query_total view. This is required for throttling support. Please note: the view should be manually created beforehand for all the specified databases.") flag.Parse() ctrl.SetLogger(klogr.New()) @@ -148,17 +165,29 @@ func main() { ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler()) defer cancel() - setupLog.Info("Configuring Redshift exporter...") - redshiftClient, err := controllers.NewRedshiftConn(uncachedClient, secretRefName, secretRefNamespace) - if err != nil { - setupLog.Error(err, "problem initializing redshift connection") - os.Exit(1) - } - redshiftCollector := redshift.NewRedshiftCollector(redshiftClient) + + // collect redshift metrics for all databases wg := &sync.WaitGroup{} + dbs := parseDatabase(databases) + redshiftClients := []*redshift.Redshift{} + for _, database := range dbs { + client, err := controllers.NewRedshiftConn(uncachedClient, + secretRefName, + secretRefNamespace, + database, + ) + if err != nil { + setupLog.Error(err, "problem initializing redshift connection") + os.Exit(1) + } + redshiftClients = append(redshiftClients, client) + } + + redshiftCollector := redshift.NewRedshiftCollector(redshiftClients) wg.Add(1) go redshiftCollector.Fetch(ctx, wg) + metrics.Registry.MustRegister(redshiftCollector) setupLog.Info("Starting Operator...") diff --git a/config/operator/redshiftsink_operator.yaml b/config/operator/redshiftsink_operator.yaml index 1121eeec4..5443b17b9 100644 --- a/config/operator/redshiftsink_operator.yaml +++ b/config/operator/redshiftsink_operator.yaml @@ -51,6 +51,7 @@ spec: - --allowed-rsks= - --promethus-url= - --collect-redshift-metrics=false + - --databases= resources: limits: cpu: 300m diff --git a/controllers/redshift_connection.go b/controllers/redshift_connection.go index 467454326..8f24ff28c 100644 --- a/controllers/redshift_connection.go +++ b/controllers/redshift_connection.go @@ -11,6 +11,7 @@ func NewRedshiftConn( client client.Client, secretName, secretNamespace string, + database *string, ) ( *redshift.Redshift, error, @@ -23,6 +24,9 @@ func NewRedshiftConn( for key, value := range k8sSecret.Data { secret[key] = string(value) } + if database != nil { + secret["redshiftDatabase"] = *database + } return NewRedshiftConnection(secret, "") } diff --git a/pkg/redshift/redshift_exporter.go b/pkg/redshift/redshift_exporter.go index 9859e8860..06abc0fbf 100644 --- a/pkg/redshift/redshift_exporter.go +++ b/pkg/redshift/redshift_exporter.go @@ -15,16 +15,16 @@ var ( ) type RedshiftCollector struct { - client *Redshift + clients []*Redshift queryTotalMetric *prometheus.Desc ready bool queryTotal sync.Map } -func NewRedshiftCollector(client *Redshift) *RedshiftCollector { +func NewRedshiftCollector(clients []*Redshift) *RedshiftCollector { return &RedshiftCollector{ - client: client, + clients: clients, queryTotalMetric: prometheus.NewDesc( prometheus.BuildFQName(Namespace, SubSystemScan, "query_total"), "Total number of redshift queries executed", @@ -35,12 +35,17 @@ func NewRedshiftCollector(client *Redshift) *RedshiftCollector { } func (c *RedshiftCollector) updateQueryTotal(ctx context.Context) { - queryTotalRows, err := c.client.ScanQueryTotal(ctx) - if err != nil { - klog.Fatalf("Redshift Collector shutdown due to error: %v", err) + var rows []QueryTotalRow + for i, client := range c.clients { + klog.V(3).Infof("fetching query_total for database:%v", i) + dbRows, err := client.ScanQueryTotal(ctx) + if err != nil { + klog.Fatalf("Redshift Collector shutdown due to error: %v", err) + } + rows = append(rows, dbRows...) } - c.queryTotal.Store("", queryTotalRows) + c.queryTotal.Store("", rows) } func (c *RedshiftCollector) Fetch(ctx context.Context, wg *sync.WaitGroup) {