-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrator.go
157 lines (131 loc) · 3.8 KB
/
migrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/frain-dev/newcloud-migrator/convoy-23.9.2/database/hooks"
"github.com/frain-dev/newcloud-migrator/convoy-23.9.2/datastore"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
)
type Migrator struct {
OldBaseURL string
OldPostgresDSN string
NewPostgresDSN string
PAT string
MigrateEvents bool
user *datastore.User
userOrgs []datastore.Organisation
projects []*datastore.Project
endpointIDs map[string]struct{}
eventIDs map[string]struct{}
deliveryIDs map[string]struct{}
sourceIDs map[string]struct{}
subIDs map[string]struct{}
newDB *sqlx.DB
oldDB *sqlx.DB
}
func (m *Migrator) BeginTx(ctx context.Context) (*sqlx.Tx, error) {
return m.newDB.BeginTxx(ctx, nil)
}
func (m *Migrator) Rollback(tx *sqlx.Tx, err error) {
if err != nil {
rbErr := tx.Rollback()
log.WithError(rbErr).Error("failed to roll back transaction")
}
cmErr := tx.Commit()
if cmErr != nil && !errors.Is(cmErr, sql.ErrTxDone) {
log.WithError(cmErr).Error("failed to commit tx rolling back transaction")
rbErr := tx.Rollback()
log.WithError(rbErr).Error("failed to roll back transaction")
}
}
var defaultPageable = datastore.Pageable{
PerPage: 500,
Direction: datastore.Next,
NextCursor: "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF",
}
func NewMigrator(oldBaseURL string, oldPostgresDSN string, newPostgresDSN string, PAT string, migrateEvents bool) (*Migrator, error) {
oldDB, err := sqlx.Connect("postgres", oldPostgresDSN)
if err != nil {
return nil, fmt.Errorf("failed to open oldPostgresDSN: %v", err)
}
newDB, err := sqlx.Connect("postgres", newPostgresDSN)
if err != nil {
return nil, fmt.Errorf("failed to open newPostgresDSN: %v", err)
}
return &Migrator{
OldBaseURL: oldBaseURL,
OldPostgresDSN: oldPostgresDSN,
NewPostgresDSN: newPostgresDSN,
PAT: PAT,
MigrateEvents: migrateEvents,
oldDB: oldDB,
newDB: newDB,
endpointIDs: map[string]struct{}{},
sourceIDs: map[string]struct{}{},
subIDs: map[string]struct{}{},
eventIDs: map[string]struct{}{},
deliveryIDs: map[string]struct{}{},
}, nil
}
func (m *Migrator) Run() error {
err := m.RunUserMigration()
if err != nil {
return fmt.Errorf("failed to run user migration: %v", err)
}
fmt.Println("Finished user migration")
err = m.RunOrgMigration()
if err != nil {
return fmt.Errorf("failed to run org migration: %v", err)
}
fmt.Println("Finished org migration")
err = m.RunProjectMigration()
if err != nil {
return fmt.Errorf("failed to run project migration: %v", err)
}
fmt.Println("Finished project migration")
err = m.RunAPIKeyMigration()
if err != nil {
return fmt.Errorf("failed to run api key migration: %v", err)
}
fmt.Println("Finished api key migration")
err = m.RunEndpointMigration()
if err != nil {
return fmt.Errorf("failed to run endpoint migration: %v", err)
}
fmt.Println("Finished endpoint migration")
err = m.RunSourceMigration()
if err != nil {
return fmt.Errorf("failed to run source migration: %v", err)
}
fmt.Println("Finished source migration")
err = m.RunSubscriptionMigration()
if err != nil {
return fmt.Errorf("failed to run subsription migration: %v", err)
}
fmt.Println("Finished subscription migration")
if m.MigrateEvents {
err = m.RunEventMigration()
if err != nil {
return fmt.Errorf("failed to run event migration: %v", err)
}
fmt.Println("Finished event migration")
err = m.RunEventDeliveriesMigration()
if err != nil {
return fmt.Errorf("failed to run event deliveries migration: %v", err)
}
fmt.Println("Finished event delivery migration")
}
return nil
}
func (m *Migrator) GetDB() *sqlx.DB {
return m.oldDB
}
func (m *Migrator) Close() error {
return nil
}
func (m *Migrator) GetHook() *hooks.Hook {
return nil
}