diff --git a/go.mod b/go.mod index 6710d33bb..bb4f0bfd7 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible github.com/flyteorg/flyteidl v1.3.14 - github.com/flyteorg/flyteplugins v1.0.40 - github.com/flyteorg/flytepropeller v1.1.70 - github.com/flyteorg/flytestdlib v1.0.15 + github.com/flyteorg/flyteplugins v1.0.49 + github.com/flyteorg/flytepropeller v1.1.83 + github.com/flyteorg/flytestdlib v1.0.17-0.20230320195919-90331d171e2a github.com/flyteorg/stow v0.3.6 github.com/ghodss/yaml v1.0.0 github.com/go-gormigrate/gormigrate/v2 v2.0.0 @@ -209,3 +209,5 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 + +//replace github.com/flyteorg/flytestdlib => github.com/flyteorg/flytestdlib@mysql-support diff --git a/go.sum b/go.sum index f2fe7c70b..1b7d72076 100644 --- a/go.sum +++ b/go.sum @@ -314,12 +314,12 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= -github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= -github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= -github.com/flyteorg/flytepropeller v1.1.70/go.mod h1:MezHUJmgPzm4Pu8nIy6LLiEkxNA6buTQ7hInSqCViTY= -github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= -github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= +github.com/flyteorg/flyteplugins v1.0.49 h1:lUmT4kqYamkJY2tO6nCWRCnVv2M2QNLIap5bFYAol7s= +github.com/flyteorg/flyteplugins v1.0.49/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= +github.com/flyteorg/flytepropeller v1.1.83 h1:p8pGSPSgjoU1UV23qXtJdB/cmFS92ATHTg1qx3FDkeI= +github.com/flyteorg/flytepropeller v1.1.83/go.mod h1:WFIzp4MhTLpco4YLXBqcaz0qeEchnstwOWiXN15AwkI= +github.com/flyteorg/flytestdlib v1.0.17-0.20230320195919-90331d171e2a h1:WQTudPI+ysEkMp648YDm8lbrVFh0sE5K5zuwjFNB37Y= +github.com/flyteorg/flytestdlib v1.0.17-0.20230320195919-90331d171e2a/go.mod h1:TcKdywJD/EokpPaTeRyn0rY0ErXKPzSsg0JKd4U0xDg= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index a79a58b25..14636be67 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -324,7 +324,6 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Model(&models.NodeExecution{}).Migrator().DropColumn(&models.NodeExecution{}, "dynamic_workflow_remote_closure_reference") }, }, - { ID: "2021-07-22-schedulable_entities", Migrate: func(tx *gorm.DB) error { @@ -334,7 +333,6 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Migrator().DropTable(&schedulerModels.SchedulableEntity{}, "schedulable_entities") }, }, - { ID: "2021-08-05-schedulable_entities_snapshot", Migrate: func(tx *gorm.DB) error { @@ -344,7 +342,6 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Migrator().DropTable(&schedulerModels.ScheduleEntitiesSnapshot{}, "schedulable_entities_snapshot") }, }, - // For any new table, Please use the following pattern due to a bug // in the postgres gorm layer https://github.com/go-gorm/postgres/issues/65 { @@ -851,6 +848,12 @@ var NoopMigrations = []*gormigrate.Migration{ { ID: "pg-noop-2023-03-31-noop-signal", Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"primary_key;column:execution_project"` + Domain string `gorm:"primary_key;column:execution_domain"` + Name string `gorm:"primary_key;column:execution_name"` + } + type SignalKey struct { ExecutionKey SignalID string `gorm:"primary_key;index" valid:"length(0|255)"` @@ -949,7 +952,934 @@ var NoopMigrations = []*gormigrate.Migration{ }, } -var Migrations = append(LegacyMigrations, NoopMigrations...) +// These migrations modify the column types from `text` to a bounded string type (i.e. varchar). +// The idea is to tighten the column types to avoid the performance penalty of using `text`, which in the case of +// MySQL is a `longtext` (4GB) column type. +var FixupMigrations = []*gormigrate.Migration{ + { + ID: "2023-03-31-fixup-project", + Migrate: func(tx *gorm.DB) error { + type Project struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + Identifier string `gorm:"size:64;primary_key"` + Name string `gorm:"size:64"` // Human-readable name, not a unique identifier. + Description string `gorm:"type:varchar(300)"` + Labels []byte + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"default:0;index"` + } + err := tx.AutoMigrate(&Project{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE projects ALTER COLUMN identifier TYPE varchar(64)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-task", + Migrate: func(tx *gorm.DB) error { + type Task struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + Project string `gorm:"size:64;primary_key;index:task_project_domain_name_idx;index:task_project_domain_idx" ` + Domain string `gorm:"size:64;primary_key;index:task_project_domain_name_idx;index:task_project_domain_idx"` + Name string `gorm:"size:511;primary_key;index:task_project_domain_name_idx"` + Version string `gorm:"size:128;primary_key"` + Closure []byte `gorm:"not null"` + // Hash of the compiled task closure + Digest []byte + // Task type (also stored in the closure put promoted as a column for filtering). + Type string `gorm:"size:255"` + // ShortDescription for the task. + ShortDescription string `gorm:"size:255"` + } + + err := tx.AutoMigrate(&Task{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE tasks ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE tasks ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE tasks ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE tasks ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-workflow", + Migrate: func(tx *gorm.DB) error { + if tx.Dialector.Name() == "mysql" { + type Workflow struct { + ID uint `gorm:"primary_key;index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + Project string `gorm:"size:64;uniqueIndex:workflow_pdnv;index:workflow_project_domain_name_idx;index:workflow_project_domain_idx"` + Domain string `gorm:"size:64;uniqueIndex:workflow_pdnv;index:workflow_project_domain_name_idx;index:workflow_project_domain_idx;not null"` + Name string `gorm:"size:511;uniqueIndex:workflow_pdnv;index:workflow_project_domain_name_idx"` + Version string `gorm:"size:128;uniqueIndex:workflow_pdnv"` + TypedInterface []byte + RemoteClosureIdentifier string `gorm:"size:2048;not null"` + // Hash of the compiled workflow closure + Digest []byte + // ShortDescription for the workflow. + ShortDescription string `gorm:"size:2048"` + } + return tx.AutoMigrate(&Workflow{}) + + } else { + type Workflow struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + Project string `gorm:"size:64;primary_key;index:workflow_project_domain_name_idx;index:workflow_project_domain_idx"` + Domain string `gorm:"size:64;primary_key:workflow_project_domain_name_idx;index:workflow_project_domain_idx;not null"` + Name string `gorm:"size:511;primary_key;index:workflow_project_domain_name_idx"` + Version string `gorm:"size:128;primary_key"` + TypedInterface []byte + RemoteClosureIdentifier string `gorm:"size:2048;not null"` + // Hash of the compiled workflow closure + Digest []byte + // ShortDescription for the workflow. + ShortDescription string `gorm:"size:2048"` + } + + err := tx.AutoMigrate(&Workflow{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE workflows ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE workflows ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE workflows ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE workflows ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + } + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-launchplan", + Migrate: func(tx *gorm.DB) error { + type LaunchPlanScheduleType string + + type LaunchPlan struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + Project string `gorm:"size:64;primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx"` + Domain string `gorm:"size:64;primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx"` + Name string `gorm:"size:511;primary_key;index:lp_project_domain_name_idx"` + Version string `gorm:"size:128;primary_key"` + Spec []byte `gorm:"not null"` + WorkflowID uint `gorm:"index"` + Closure []byte `gorm:"not null"` + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"default:0"` + // Hash of the launch plan + Digest []byte + ScheduleType LaunchPlanScheduleType `gorm:"size:255"` + } + err := tx.AutoMigrate(&LaunchPlan{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE launch_plans ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE launch_plans ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE launch_plans ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE launch_plans ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-namedentitymetadata", + Migrate: func(tx *gorm.DB) error { + type NamedEntityMetadata struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + ResourceType core.ResourceType `gorm:"primary_key;index:named_entity_metadata_type_project_domain_name_idx"` + Project string `gorm:"size:64;primary_key;index:named_entity_metadata_type_project_domain_name_idx"` + Domain string `gorm:"size:64;primary_key;index:named_entity_metadata_type_project_domain_name_idx"` + Name string `gorm:"size:511;primary_key;index:named_entity_metadata_type_project_domain_name_idx"` + Description string `gorm:"type:varchar(300)"` + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"default:0"` + } + + err := tx.AutoMigrate(&NamedEntityMetadata{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE named_entity_metadata ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE named_entity_metadata ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE named_entity_metadata ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-execution", + Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"size:64;primary_key;column:execution_project"` + Domain string `gorm:"size:64;primary_key;column:execution_domain"` + Name string `gorm:"size:511;primary_key;column:execution_name"` + } + + type Execution struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + ExecutionKey + LaunchPlanID uint `gorm:"index"` + WorkflowID uint `gorm:"index"` + TaskID uint `gorm:"index"` + Phase string `gorm:"size:50"` + Closure []byte + Spec []byte `gorm:"not null"` + StartedAt *time.Time + // Corresponds to the CreatedAt field in the Execution closure. + // Prefixed with Execution to avoid clashes with gorm.Model CreatedAt + ExecutionCreatedAt *time.Time `gorm:"index:idx_executions_created_at"` + // Corresponds to the UpdatedAt field in the Execution closure + // Prefixed with Execution to avoid clashes with gorm.Model UpdatedAt + ExecutionUpdatedAt *time.Time + Duration time.Duration + // In the case of an aborted execution this string may be non-empty. + // It should be ignored for any other value of phase other than aborted. + AbortCause string `gorm:"size:2048"` + // Corresponds to the execution mode used to trigger this execution + Mode int32 + // The "parent" execution (if there is one) that is related to this execution. + SourceExecutionID uint + // The parent node execution if this was launched by a node + ParentNodeExecutionID uint + // Cluster where execution was triggered + Cluster string `gorm:"size:512"` + // Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults. + InputsURI storage.DataReference + // User specified inputs. This map might be incomplete and not include defaults applied + UserInputsURI storage.DataReference + // Execution Error Kind. nullable + ErrorKind *string `gorm:"size:100;index"` + // Execution Error Code nullable + ErrorCode *string `gorm:"size:2048"` + // The user responsible for launching this execution. + // This is also stored in the spec but promoted as a column for filtering. + User string `gorm:"size:128;index"` + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"index;default:0"` + // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' + LaunchEntity string `gorm:"size:128"` + } + + err := tx.AutoMigrate(&Execution{}) + if err != nil { + return err + } + + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE executions ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE executions ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE executions ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-taskexecution", + Migrate: func(tx *gorm.DB) error { + // The `task_executions` table is a special case because of the number of fields present in its primary key. + // Postgres does not have a restriction on the total length of its primary keys, however, the same is not true + // for MySQL. MySQL has a limit of 3072 bytes for the total length of the primary key. So, in that case we rely + // on secondary indexes instead. + if tx.Dialector.Name() == "mysql" { + type TaskKey struct { + Project string `gorm:"size:64"` + Domain string `gorm:"size:64"` + Name string `gorm:"size:511"` + Version string `gorm:"size:128"` + } + type TaskExecutionKey struct { + TaskKey + Project string `gorm:"size:64;index:idx_taskexecutionkey_project_domain_name_nodeid;column:execution_project"` + Domain string `gorm:"size:64;index:idx_taskexecutionkey_project_domain_name_nodeid;column:execution_domain"` + Name string `gorm:"size:511;index:idx_taskexecutionkey_project_domain_name_nodeid;column:execution_name"` + NodeID string `gorm:"size:30;index:idx_taskexecutionkey_project_domain_name_nodeid;"` + // *IMPORTANT* This is a pointer to an int in order to allow setting an empty ("0") value according to gorm convention. + // Because RetryAttempt is part of the TaskExecution primary key is should *never* be null. + RetryAttempt *uint32 `gorm:"AUTO_INCREMENT:FALSE"` + } + type TaskExecution struct { + ID uint `gorm:"index;autoIncrement;not null"` // maybe need to add primary key, if not remove from workflows + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time + TaskExecutionKey + Phase string `gorm:"size:50"` + PhaseVersion uint32 + InputURI string `gorm:"size:2048"` + Closure []byte + StartedAt *time.Time + // Corresponds to the CreatedAt field in the TaskExecution closure + // This field is prefixed with TaskExecution because it signifies when + // the execution was createdAt, not to be confused with gorm.Model.CreatedAt + TaskExecutionCreatedAt *time.Time + // Corresponds to the UpdatedAt field in the TaskExecution closure + // This field is prefixed with TaskExecution because it signifies when + // the execution was UpdatedAt, not to be confused with gorm.Model.UpdatedAt + TaskExecutionUpdatedAt *time.Time + Duration time.Duration + // The child node executions (if any) launched by this task execution. + // TODO: this refers to `NodeExecution` defined at the top of this file. Should this also be defined inline? + ChildNodeExecution []NodeExecution `gorm:"foreignkey:ParentTaskExecutionID;references:ID"` + } + + return tx.AutoMigrate(&TaskExecution{}) + } else { + // For all other databases, we can use the primary key as defined in the model. + // ** Please, keep the model definitions in sync with the mysql ones defined above. ** + type TaskKey struct { + Project string `gorm:"size:64;primary_key"` + Domain string `gorm:"size:64;primary_key"` + Name string `gorm:"size:511;primary_key"` + Version string `gorm:"size:128;primary_key"` + } + type TaskExecutionKey struct { + TaskKey + Project string `gorm:"size:64;primary_key;column:execution_project;index:idx_task_executions_exec"` + Domain string `gorm:"size:64;primary_key;column:execution_domain;index:idx_task_executions_exec"` + Name string `gorm:"size:511;primary_key;column:execution_name;index:idx_task_executions_exec"` + NodeID string `gorm:"size:30;primary_key;index:idx_task_executions_exec;index"` + // *IMPORTANT* This is a pointer to an int in order to allow setting an empty ("0") value according to gorm convention. + // Because RetryAttempt is part of the TaskExecution primary key is should *never* be null. + RetryAttempt *uint32 `gorm:"primary_key;AUTO_INCREMENT:FALSE"` + } + type TaskExecution struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + TaskExecutionKey + Phase string `gorm:"size:50"` + PhaseVersion uint32 + InputURI string `gorm:"size:2048"` + Closure []byte + StartedAt *time.Time + // Corresponds to the CreatedAt field in the TaskExecution closure + // This field is prefixed with TaskExecution because it signifies when + // the execution was createdAt, not to be confused with gorm.Model.CreatedAt + TaskExecutionCreatedAt *time.Time + // Corresponds to the UpdatedAt field in the TaskExecution closure + // This field is prefixed with TaskExecution because it signifies when + // the execution was UpdatedAt, not to be confused with gorm.Model.UpdatedAt + TaskExecutionUpdatedAt *time.Time + Duration time.Duration + // The child node executions (if any) launched by this task execution. + // TODO: this refers to `NodeExecution` defined at the top of this file. Should this also be defined inline? + ChildNodeExecution []NodeExecution `gorm:"foreignkey:ParentTaskExecutionID;references:ID"` + } + + err := tx.AutoMigrate(&TaskExecution{}) + if err != nil { + return err + } + + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE task_executions ALTER COLUMN node_id TYPE varchar(30)").Error + if err != nil { + return err + } + } + return nil + } + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-task_executions_create_primary_alt_index", + Migrate: func(tx *gorm.DB) error { + // This migration only applies to mysql as it has a limit on the size of the index. + // For other databases, we can use the primary key as defined in the model. + // ** Please, keep the model definitions in sync with the mysql ones defined above. ** + if tx.Dialector.Name() == "mysql" { + return tx.Exec("CREATE INDEX primary_alt ON task_executions(project, domain, name(200), version, execution_project, execution_domain, execution_name(128), node_id, retry_attempt);").Error + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-nodeexecution", + Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"size:64;primary_key;column:execution_project"` + Domain string `gorm:"size:64;primary_key;column:execution_domain"` + Name string `gorm:"size:511;primary_key;column:execution_name"` + } + + type NodeExecutionKey struct { + ExecutionKey + NodeID string `gorm:"size:30;primary_key;index"` + } + type NodeExecution struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + NodeExecutionKey + // Also stored in the closure, but defined as a separate column because it's useful for filtering and sorting. + Phase string `gorm:"size:50"` + InputURI string `gorm:"size:2048"` + Closure []byte + StartedAt *time.Time + // Corresponds to the CreatedAt field in the NodeExecution closure + // Prefixed with NodeExecution to avoid clashes with gorm.Model CreatedAt + NodeExecutionCreatedAt *time.Time + // Corresponds to the UpdatedAt field in the NodeExecution closure + // Prefixed with NodeExecution to avoid clashes with gorm.Model UpdatedAt + NodeExecutionUpdatedAt *time.Time + Duration time.Duration + // The task execution (if any) which launched this node execution. + // TO BE DEPRECATED - as we have now introduced ParentID + ParentTaskExecutionID uint `sql:"default:null" gorm:"index"` + // The workflow execution (if any) which this node execution launched + LaunchedExecution models.Execution `gorm:"foreignKey:ParentNodeExecutionID;references:ID"` + // In the case of dynamic workflow nodes, the remote closure is uploaded to the path specified here. + DynamicWorkflowRemoteClosureReference string `gorm:"size:2048"` + // Metadata that is only relevant to the flyteadmin service that is used to parse the model and track additional attributes. + InternalData []byte + NodeExecutionMetadata []byte + // Parent that spawned this node execution - value is empty for executions at level 0 + ParentID *uint `sql:"default:null" gorm:"index"` + // List of child node executions - for cases like Dynamic task, sub workflow, etc + ChildNodeExecutions []NodeExecution `gorm:"foreignKey:ParentID;references:ID"` + // Execution Error Kind. nullable, can be one of core.ExecutionError_ErrorKind + ErrorKind *string `gorm:"size:50;index"` + // Execution Error Code nullable. string value, but finite set determined by the execution engine and plugins + ErrorCode *string `gorm:"size:255"` + // If the node is of Type Task, this should always exist for a successful execution, indicating the cache status for the execution + CacheStatus *string `gorm:"size:255"` + } + + err := tx.AutoMigrate(&NodeExecution{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE node_executions ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_executions ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_executions ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_executions ALTER COLUMN node_id TYPE varchar(30)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-execution-event", + Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"size:64;primary_key;column:execution_project"` + Domain string `gorm:"size:64;primary_key;column:execution_domain"` + Name string `gorm:"size:511;primary_key;column:execution_name"` + } + type ExecutionEvent struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + ExecutionKey + RequestID string `gorm:"size:255"` + OccurredAt time.Time `gorm:"type:time"` + Phase string `gorm:"size:50;primary_key"` + } + + err := tx.AutoMigrate(&ExecutionEvent{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE execution_events ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE execution_events ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE execution_events ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE execution_events ALTER COLUMN phase TYPE varchar(50)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-node-execution-event", + Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"size:64;primary_key;column:execution_project"` + Domain string `gorm:"size:64;primary_key;column:execution_domain"` + Name string `gorm:"size:511;primary_key;column:execution_name"` + } + type NodeExecutionKey struct { + ExecutionKey + NodeID string `gorm:"size:30;primary_key;index"` + } + type NodeExecutionEvent struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + NodeExecutionKey + RequestID string `gorm:"size:255"` + OccurredAt time.Time + Phase string `gorm:"size:50;primary_key"` + } + + err := tx.AutoMigrate(&NodeExecutionEvent{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE node_execution_events ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_execution_events ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_execution_events ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_execution_events ALTER COLUMN node_id TYPE varchar(30)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE node_execution_events ALTER COLUMN phase TYPE varchar(50)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-description-entity", + Migrate: func(tx *gorm.DB) error { + type DescriptionEntityKey struct { + // ResourceType is an enum that indicates the type of the resource. We represent it as an uint32. + ResourceType core.ResourceType `gorm:"primary_key;index:description_entity_project_domain_name_version_idx"` + Project string `gorm:"size:64;primary_key;index:description_entity_project_domain_name_version_idx"` + Domain string `gorm:"size:64;primary_key;index:description_entity_project_domain_name_version_idx"` + Name string `gorm:"size:511;primary_key;index:description_entity_project_domain_name_version_idx"` + Version string `gorm:"size:128;primary_key;index:description_entity_project_domain_name_version_idx"` + } + + // SourceCode Database model to encapsulate a SourceCode. + type SourceCode struct { + Link string `gorm:"size:2048"` + } + + // DescriptionEntity Database model to encapsulate a DescriptionEntity. + type DescriptionEntity struct { + DescriptionEntityKey + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + SourceCode + ShortDescription string `gorm:"size:2048"` + LongDescription []byte + } + + err := tx.AutoMigrate(&DescriptionEntity{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE description_entities ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE description_entities ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE description_entities ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE description_entities ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-signal", + Migrate: func(tx *gorm.DB) error { + type ExecutionKey struct { + Project string `gorm:"size:64;primary_key;column:execution_project"` + Domain string `gorm:"size:64;primary_key;column:execution_domain"` + Name string `gorm:"size:511;primary_key;column:execution_name"` + } + type SignalKey struct { + ExecutionKey + SignalID string `gorm:"size:128;primary_key"` + } + + type Signal struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time `gorm:"type:time"` + UpdatedAt time.Time `gorm:"type:time"` + DeletedAt *time.Time `gorm:"index"` + SignalKey + Type []byte `gorm:"not null"` + Value []byte + } + + err := tx.AutoMigrate(&Signal{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE signals ALTER COLUMN execution_project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE signals ALTER COLUMN execution_domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE signals ALTER COLUMN execution_name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE signals ALTER COLUMN signal_id TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-resource", + Migrate: func(tx *gorm.DB) error { + if tx.Dialector.Name() == "mysql" { + type ResourcePriority int32 + + // In this model, the combination of (Project, Domain, Workflow, LaunchPlan, ResourceType) is unique + type Resource struct { + ID int64 `gorm:"AUTO_INCREMENT;column:id;primary_key;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `sql:"index"` + Project string `gorm:"size:64"` + Domain string `gorm:"size:64"` + Workflow string `gorm:"size:511"` + LaunchPlan string `gorm:"size:511"` + ResourceType string `gorm:"size:50"` + Priority ResourcePriority + // Serialized flyteidl.admin.MatchingAttributes. + Attributes []byte + } + + return tx.AutoMigrate(&Resource{}) + } else { + type ResourcePriority int32 + + // In this model, the combination of (Project, Domain, Workflow, LaunchPlan, ResourceType) is unique + type Resource struct { + ID int64 `gorm:"AUTO_INCREMENT;column:id;primary_key;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `sql:"index"` + Project string `gorm:"size:64;uniqueIndex:idx_project_domain_workflow_resource_type"` + Domain string `gorm:"size:64;uniqueIndex:idx_project_domain_workflow_resource_type"` + Workflow string `gorm:"size:511;uniqueIndex:idx_project_domain_workflow_resource_type"` + LaunchPlan string `gorm:"size:511;uniqueIndex:idx_project_domain_workflow_resource_type"` + ResourceType string `gorm:"size:50;uniqueIndex:idx_project_domain_workflow_resource_type"` + Priority ResourcePriority + // Serialized flyteidl.admin.MatchingAttributes. + Attributes []byte + } + + return tx.AutoMigrate(&Resource{}) + } + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-resources_create_primary_alt_index", + Migrate: func(tx *gorm.DB) error { + // This migration only applies to mysql as it has a limit on the size of the index. + // For other databases, we can use the primary key as defined in the model. + // ** Please, keep the model definitions in sync with the mysql ones defined above. ** + if tx.Dialector.Name() == "mysql" { + return tx.Exec("CREATE INDEX primary_alt ON resources(project, domain, workflow(200), launch_plan(200), resource_type);").Error + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-schedulable_entities", + Migrate: func(tx *gorm.DB) error { + type SchedulableEntityKey struct { + Project string `gorm:"size:64;primary_key"` + Domain string `gorm:"size:64;primary_key"` + Name string `gorm:"size:511;primary_key"` + Version string `gorm:"size:128;primary_key"` + } + type SchedulableEntity struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `gorm:"index"` + SchedulableEntityKey + CronExpression string `gorm:"size:100"` + FixedRateValue uint32 + Unit admin.FixedRateUnit + KickoffTimeInputArg string `gorm:"size:100"` + Active *bool + } + + err := tx.AutoMigrate(&SchedulableEntity{}) + if err != nil { + return err + } + // Run manual migrations for the primary key columns in the case of postgres + if tx.Dialector.Name() == "postgres" { + err = tx.Exec("ALTER TABLE schedulable_entities ALTER COLUMN project TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE schedulable_entities ALTER COLUMN domain TYPE varchar(64)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE schedulable_entities ALTER COLUMN name TYPE varchar(511)").Error + if err != nil { + return err + } + err = tx.Exec("ALTER TABLE schedulable_entities ALTER COLUMN version TYPE varchar(128)").Error + if err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + { + ID: "2023-03-31-fixup-schedulable_entities-snapshot", + Migrate: func(tx *gorm.DB) error { + type ScheduleEntitiesSnapshot struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `gorm:"index"` + Snapshot []byte `gorm:"column:snapshot" schema:"-"` + } + + return tx.AutoMigrate(&ScheduleEntitiesSnapshot{}) + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, +} + +func ListMigrations(db *gorm.DB) []*gormigrate.Migration { + var Migrations []*gormigrate.Migration + // Only run legacy migrations for postgres + if db.Dialector.Name() == "postgres" { + Migrations = append(LegacyMigrations, NoopMigrations...) + } + Migrations = append(Migrations, FixupMigrations...) + return Migrations +} func alterTableColumnType(db *sql.DB, columnName, columnType string) error { diff --git a/pkg/repositories/config/migrations_test.go b/pkg/repositories/config/migrations_test.go index 29f798051..6d43420a7 100644 --- a/pkg/repositories/config/migrations_test.go +++ b/pkg/repositories/config/migrations_test.go @@ -1,10 +1,17 @@ package config import ( - "testing" - + "context" + "fmt" mocket "github.com/Selvatico/go-mocket" + "github.com/flyteorg/flytestdlib/database" + "github.com/go-gormigrate/gormigrate/v2" "github.com/stretchr/testify/assert" + gormLogger "gorm.io/gorm/logger" + "log" + "os" + "testing" + "time" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -32,3 +39,41 @@ func GetDbForTest(t *testing.T) *gorm.DB { } return db } + +func TestMigrations(t *testing.T) { + gLogger := gormLogger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), gormLogger.Config{ + SlowThreshold: 200 * time.Millisecond, + LogLevel: gormLogger.Info, + IgnoreRecordNotFoundError: false, + Colorful: true, + }) + + gormConfig := &gorm.Config{ + Logger: gLogger, + DisableForeignKeyConstraintWhenMigrating: false, + } + + var gormDb *gorm.DB + pgConfig := database.PostgresConfig{ + Host: "localhost", + Port: 30001, + DbName: "flyte", + User: "postgres", + Password: "postgres", + ExtraOptions: "", + Debug: false, + } + ctx := context.Background() + postgresDsn := database.PostgresDsn(ctx, pgConfig) + dialector := postgres.Open(postgresDsn) + gormDb, err := gorm.Open(dialector, gormConfig) + assert.NoError(t, err) + + fmt.Println(gormDb) + + m := gormigrate.New(gormDb, gormigrate.DefaultOptions, Migrations) + if err := m.Migrate(); err != nil { + fmt.Errorf("database migration failed: %v", err) + } + fmt.Println(ctx, "Migration ran successfully") +} diff --git a/pkg/repositories/database.go b/pkg/repositories/database.go index fdc663ac6..903cf7bc1 100644 --- a/pkg/repositories/database.go +++ b/pkg/repositories/database.go @@ -2,58 +2,15 @@ package repositories import ( "context" - "errors" "fmt" - "io/ioutil" - "os" - "strings" - "github.com/flyteorg/flytestdlib/database" "gorm.io/driver/sqlite" "github.com/flyteorg/flytestdlib/logger" - "github.com/jackc/pgconn" - "gorm.io/driver/postgres" "gorm.io/gorm" ) -const pqInvalidDBCode = "3D000" -const pqDbAlreadyExistsCode = "42P04" -const defaultDB = "postgres" - -// Resolves a password value from either a user-provided inline value or a filepath whose contents contain a password. -func resolvePassword(ctx context.Context, passwordVal, passwordPath string) string { - password := passwordVal - if len(passwordPath) > 0 { - if _, err := os.Stat(passwordPath); os.IsNotExist(err) { - logger.Fatalf(ctx, - "missing database password at specified path [%s]", passwordPath) - } - passwordVal, err := ioutil.ReadFile(passwordPath) - if err != nil { - logger.Fatalf(ctx, "failed to read database password from path [%s] with err: %v", - passwordPath, err) - } - // Passwords can contain special characters as long as they are percent encoded - // https://www.postgresql.org/docs/current/libpq-connect.html - password = strings.TrimSpace(string(passwordVal)) - } - return password -} - -// Produces the DSN (data source name) for opening a postgres db connection. -func getPostgresDsn(ctx context.Context, pgConfig database.PostgresConfig) string { - password := resolvePassword(ctx, pgConfig.Password, pgConfig.PasswordPath) - if len(password) == 0 { - // The password-less case is included for development environments. - return fmt.Sprintf("host=%s port=%d dbname=%s user=%s sslmode=disable", - pgConfig.Host, pgConfig.Port, pgConfig.DbName, pgConfig.User) - } - return fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s %s", - pgConfig.Host, pgConfig.Port, pgConfig.DbName, pgConfig.User, password, pgConfig.ExtraOptions) -} - // GetDB uses the dbConfig to create gorm DB object. If the db doesn't exist for the dbConfig then a new one is created // using the default db for the provider. eg : postgres has default dbName as postgres func GetDB(ctx context.Context, dbConfig *database.DbConfig, logConfig *logger.Config) ( @@ -78,12 +35,16 @@ func GetDB(ctx context.Context, dbConfig *database.DbConfig, logConfig *logger.C if err != nil { return nil, err } + case !(dbConfig.Mysql.IsEmpty()): + gormDb, err = database.CreateMysqlDbIfNotExists(ctx, gormConfig, dbConfig.Mysql) + if err != nil { + return nil, err + } case !(dbConfig.Postgres.IsEmpty()): - gormDb, err = createPostgresDbIfNotExists(ctx, gormConfig, dbConfig.Postgres) + gormDb, err = database.CreatePostgresDbIfNotExists(ctx, gormConfig, dbConfig.Postgres) if err != nil { return nil, err } - case len(dbConfig.DeprecatedHost) > 0 || len(dbConfig.DeprecatedUser) > 0 || len(dbConfig.DeprecatedDbName) > 0: pgConfig := database.PostgresConfig{ Host: dbConfig.DeprecatedHost, @@ -95,7 +56,7 @@ func GetDB(ctx context.Context, dbConfig *database.DbConfig, logConfig *logger.C ExtraOptions: dbConfig.DeprecatedExtraOptions, Debug: dbConfig.DeprecatedDebug, } - gormDb, err = createPostgresDbIfNotExists(ctx, gormConfig, pgConfig) + gormDb, err = database.CreatePostgresDbIfNotExists(ctx, gormConfig, pgConfig) if err != nil { return nil, err } @@ -107,59 +68,6 @@ func GetDB(ctx context.Context, dbConfig *database.DbConfig, logConfig *logger.C return gormDb, setupDbConnectionPool(ctx, gormDb, dbConfig) } -// Creates DB if it doesn't exist for the passed in config -func createPostgresDbIfNotExists(ctx context.Context, gormConfig *gorm.Config, pgConfig database.PostgresConfig) (*gorm.DB, error) { - - dialector := postgres.Open(getPostgresDsn(ctx, pgConfig)) - gormDb, err := gorm.Open(dialector, gormConfig) - if err == nil { - return gormDb, nil - } - - if !isPgErrorWithCode(err, pqInvalidDBCode) { - return nil, err - } - - logger.Warningf(ctx, "Database [%v] does not exist", pgConfig.DbName) - - // Every postgres installation includes a 'postgres' database by default. We connect to that now in order to - // initialize the user-specified database. - defaultDbPgConfig := pgConfig - defaultDbPgConfig.DbName = defaultDB - defaultDBDialector := postgres.Open(getPostgresDsn(ctx, defaultDbPgConfig)) - gormDb, err = gorm.Open(defaultDBDialector, gormConfig) - if err != nil { - return nil, err - } - - // Because we asserted earlier that the db does not exist, we create it now. - logger.Infof(ctx, "Creating database %v", pgConfig.DbName) - - // NOTE: golang sql drivers do not support parameter injection for CREATE calls - createDBStatement := fmt.Sprintf("CREATE DATABASE %s", pgConfig.DbName) - result := gormDb.Exec(createDBStatement) - - if result.Error != nil { - if !isPgErrorWithCode(result.Error, pqDbAlreadyExistsCode) { - return nil, result.Error - } - logger.Warningf(ctx, "Got DB already exists error for [%s], skipping...", pgConfig.DbName) - } - // Now try connecting to the db again - return gorm.Open(dialector, gormConfig) -} - -func isPgErrorWithCode(err error, code string) bool { - pgErr := &pgconn.PgError{} - if !errors.As(err, &pgErr) { - // err chain does not contain a pgconn.PgError - return false - } - - // pgconn.PgError found in chain and set to code specified - return pgErr.Code == code -} - func setupDbConnectionPool(ctx context.Context, gormDb *gorm.DB, dbConfig *database.DbConfig) error { genericDb, err := gormDb.DB() if err != nil { diff --git a/pkg/repositories/database_test.go b/pkg/repositories/database_test.go index 4dea5585e..5c22402c3 100644 --- a/pkg/repositories/database_test.go +++ b/pkg/repositories/database_test.go @@ -2,19 +2,14 @@ package repositories import ( "context" - "errors" - "io/ioutil" - "net" "os" "path" "path/filepath" "testing" "time" - "github.com/flyteorg/flytestdlib/database" - "github.com/jackc/pgconn" - "github.com/flyteorg/flytestdlib/config" + "github.com/flyteorg/flytestdlib/database" "github.com/flyteorg/flytestdlib/logger" "github.com/stretchr/testify/assert" @@ -22,111 +17,6 @@ import ( "gorm.io/gorm" ) -func TestResolvePassword(t *testing.T) { - password := "123abc" - tmpFile, err := ioutil.TempFile("", "prefix") - if err != nil { - t.Errorf("Couldn't open temp file: %v", err) - } - defer tmpFile.Close() - if _, err = tmpFile.WriteString(password); err != nil { - t.Errorf("Couldn't write to temp file: %v", err) - } - resolvedPassword := resolvePassword(context.TODO(), "", tmpFile.Name()) - assert.Equal(t, resolvedPassword, password) -} - -func TestGetPostgresDsn(t *testing.T) { - pgConfig := database.PostgresConfig{ - Host: "localhost", - Port: 5432, - DbName: "postgres", - User: "postgres", - ExtraOptions: "sslmode=disable", - } - t.Run("no password", func(t *testing.T) { - dsn := getPostgresDsn(context.TODO(), pgConfig) - assert.Equal(t, "host=localhost port=5432 dbname=postgres user=postgres sslmode=disable", dsn) - }) - t.Run("with password", func(t *testing.T) { - pgConfig.Password = "pass" - dsn := getPostgresDsn(context.TODO(), pgConfig) - assert.Equal(t, "host=localhost port=5432 dbname=postgres user=postgres password=pass sslmode=disable", dsn) - - }) - t.Run("with password, no extra", func(t *testing.T) { - pgConfig.Password = "pass" - pgConfig.ExtraOptions = "" - dsn := getPostgresDsn(context.TODO(), pgConfig) - assert.Equal(t, "host=localhost port=5432 dbname=postgres user=postgres password=pass ", dsn) - }) - t.Run("with password path", func(t *testing.T) { - password := "123abc" - tmpFile, err := ioutil.TempFile("", "prefix") - if err != nil { - t.Errorf("Couldn't open temp file: %v", err) - } - defer tmpFile.Close() - if _, err = tmpFile.WriteString(password); err != nil { - t.Errorf("Couldn't write to temp file: %v", err) - } - pgConfig.PasswordPath = tmpFile.Name() - dsn := getPostgresDsn(context.TODO(), pgConfig) - assert.Equal(t, "host=localhost port=5432 dbname=postgres user=postgres password=123abc ", dsn) - }) -} - -type wrappedError struct { - err error -} - -func (e *wrappedError) Error() string { - return e.err.Error() -} - -func (e *wrappedError) Unwrap() error { - return e.err -} - -func TestIsInvalidDBPgError(t *testing.T) { - // wrap error with wrappedError when testing to ensure the function checks the whole error chain - - testCases := []struct { - Name string - Err error - ExpectedResult bool - }{ - { - Name: "nil error", - Err: nil, - ExpectedResult: false, - }, - { - Name: "not a PgError", - Err: &wrappedError{err: &net.OpError{Op: "connect", Err: errors.New("connection refused")}}, - ExpectedResult: false, - }, - { - Name: "PgError but not invalid DB", - Err: &wrappedError{&pgconn.PgError{Severity: "FATAL", Message: "out of memory", Code: "53200"}}, - ExpectedResult: false, - }, - { - Name: "PgError and is invalid DB", - Err: &wrappedError{&pgconn.PgError{Severity: "FATAL", Message: "database \"flyte\" does not exist", Code: "3D000"}}, - ExpectedResult: true, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.ExpectedResult, isPgErrorWithCode(tc.Err, pqInvalidDBCode)) - }) - } -} - func TestSetupDbConnectionPool(t *testing.T) { ctx := context.TODO() t.Run("successful", func(t *testing.T) { @@ -196,41 +86,3 @@ func TestGetDB(t *testing.T) { assert.Equal(t, "sqlite", db.Name()) }) } - -func TestIsPgDbAlreadyExistsError(t *testing.T) { - // wrap error with wrappedError when testing to ensure the function checks the whole error chain - - testCases := []struct { - Name string - Err error - ExpectedResult bool - }{ - { - Name: "nil error", - Err: nil, - ExpectedResult: false, - }, - { - Name: "not a PgError", - Err: &wrappedError{err: &net.OpError{Op: "connect", Err: errors.New("connection refused")}}, - ExpectedResult: false, - }, - { - Name: "PgError but not already exists", - Err: &wrappedError{&pgconn.PgError{Severity: "FATAL", Message: "out of memory", Code: "53200"}}, - ExpectedResult: false, - }, - { - Name: "PgError and is already exists", - Err: &wrappedError{&pgconn.PgError{Severity: "FATAL", Message: "database \"flyte\" does not exist", Code: "42P04"}}, - ExpectedResult: true, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.ExpectedResult, isPgErrorWithCode(tc.Err, pqDbAlreadyExistsCode)) - }) - } -} diff --git a/pkg/repositories/models/description_entity.go b/pkg/repositories/models/description_entity.go index 3a5c71625..d1f30373f 100644 --- a/pkg/repositories/models/description_entity.go +++ b/pkg/repositories/models/description_entity.go @@ -22,9 +22,9 @@ type DescriptionEntity struct { BaseModel + SourceCode + ShortDescription string LongDescription []byte - - SourceCode } diff --git a/pkg/server/initialize.go b/pkg/server/initialize.go index 69c384676..02367f4dd 100644 --- a/pkg/server/initialize.go +++ b/pkg/server/initialize.go @@ -43,7 +43,7 @@ func withDB(ctx context.Context, do func(db *gorm.DB) error) error { // Migrate runs all configured migrations func Migrate(ctx context.Context) error { return withDB(ctx, func(db *gorm.DB) error { - m := gormigrate.New(db, gormigrate.DefaultOptions, config.Migrations) + m := gormigrate.New(db, gormigrate.DefaultOptions, config.ListMigrations(db)) if err := m.Migrate(); err != nil { return fmt.Errorf("database migration failed: %v", err) } @@ -55,7 +55,7 @@ func Migrate(ctx context.Context) error { // Rollback rolls back the last migration func Rollback(ctx context.Context) error { return withDB(ctx, func(db *gorm.DB) error { - m := gormigrate.New(db, gormigrate.DefaultOptions, config.Migrations) + m := gormigrate.New(db, gormigrate.DefaultOptions, config.ListMigrations(db)) err := m.RollbackLast() if err != nil { return fmt.Errorf("could not rollback latest migration: %v", err)