diff --git a/cmd/cleanup-index/main.go b/cmd/cleanup-index/main.go index 4a78e0522..143aeafde 100644 --- a/cmd/cleanup-index/main.go +++ b/cmd/cleanup-index/main.go @@ -63,8 +63,8 @@ var ( redisEnableTLS = flag.Bool("redis-enable-tls", false, "Enable TLS for Redis client") redisInsecureSkipVerify = flag.Bool("redis-insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not") mysqlDSN = flag.String("mysql-dsn", "", "MySQL Data Source Name") - versionFlag = flag.Bool("version", false, "Print the current version of Backfill MySQL") - dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into MySQL") + versionFlag = flag.Bool("version", false, "Print the current version of cleanup-index") + dryRun = flag.Bool("dry-run", false, "Dry run - don't actually delete from Redis") ) func main() { @@ -97,13 +97,9 @@ func main() { ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - keys, err := getKeysToDelete(ctx, mysqlClient) + err = batchQueryAndDelete(ctx, mysqlClient, redisClient) if err != nil { - log.Fatalf("getting keys from mysql: %v", err) - } - err = removeFromRedis(ctx, redisClient, keys) - if err != nil { - log.Fatalf("deleting keys from redis: %v", err) + log.Fatal(err) } } @@ -136,16 +132,42 @@ func getMySQLClient() (*sqlx.DB, error) { return dbClient, nil } -// getKeysToDelete looks up entries in the EntryIndex table in MySQL. -func getKeysToDelete(ctx context.Context, dbClient *sqlx.DB) ([]string, error) { - keys := []string{} - err := dbClient.SelectContext(ctx, &keys, mysqlSelectStmt) - return keys, err +// batchQueryAndDelete looks up entries in the EntryIndex table in MySQL and removes them from Redis. +// The SQL query is processed in batches because the entire data set is too large to store in memory at once. +func batchQueryAndDelete(ctx context.Context, dbClient *sqlx.DB, redisClient *redis.Client) error { + const batchSize = 10000 + keys := [batchSize]string{} + rows, err := dbClient.QueryContext(ctx, mysqlSelectStmt) + if err != nil { + return err + } + defer rows.Close() + index := 0 + batchIndex := 1 + for rows.Next() { + if err := rows.Scan(&keys[index]); err != nil { + return err + } + index++ + if index == batchSize { + if err := removeFromRedis(ctx, redisClient, keys[:], batchIndex); err != nil { + return err + } + index = 0 + batchIndex++ + } + } + if index != 0 { + if err := removeFromRedis(ctx, redisClient, keys[:index], batchIndex); err != nil { + return err + } + } + return nil } // removeFromRedis delete the given keys from Redis. -func removeFromRedis(ctx context.Context, redisClient *redis.Client, keys []string) error { - fmt.Printf("attempting to remove %d keys from redis\n", len(keys)) +func removeFromRedis(ctx context.Context, redisClient *redis.Client, keys []string, batchIndex int) error { + fmt.Printf("attempting to remove %d keys from redis (batch %d)\n", len(keys), batchIndex) if *dryRun { return nil }