-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrate_subcriptions.go
110 lines (97 loc) · 3.41 KB
/
migrate_subcriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"context"
"github.com/frain-dev/newcloud-migrator/convoy-23.9.2/database/postgres"
"github.com/frain-dev/newcloud-migrator/convoy-23.9.2/datastore"
"github.com/frain-dev/newcloud-migrator/convoy-23.9.2/util"
)
func (m *Migrator) RunSubscriptionMigration() error {
subRepo := postgres.NewSubscriptionRepo(m)
for _, p := range m.projects {
err := m.loadProjectSubscriptions(subRepo, p.UID, defaultPageable)
if err != nil {
return err
}
}
return nil
}
const (
createSubscription = `
INSERT INTO convoy.subscriptions (
id,name,type,
project_id,endpoint_id,device_id,
source_id,alert_config_count,alert_config_threshold,
retry_config_type,retry_config_duration,
retry_config_retry_count,filter_config_event_types,
filter_config_filter_headers,filter_config_filter_body,
rate_limit_config_count,rate_limit_config_duration,
created_at, updated_at, deleted_at,function
)
VALUES (
:id, :name, :type,
:project_id, :endpoint_id, :device_id,
:source_id, :alert_config_count, :alert_config_threshold,
:retry_config_type, :retry_config_duration,
:retry_config_retry_count, :filter_config_event_types,
:filter_config_filter_headers, :filter_config_filter_body,
:rate_limit_config_count, :rate_limit_config_duration,
:created_at, :updated_at, :deleted_at, :function
);
`
)
func (m *Migrator) SaveSubscriptions(ctx context.Context, subscriptions []datastore.Subscription) error {
values := make([]map[string]interface{}, 0)
for i := range subscriptions {
subscription := &subscriptions[i]
ac := subscription.GetAlertConfig()
rc := subscription.GetRetryConfig()
fc := subscription.GetFilterConfig()
rlc := subscription.GetRateLimitConfig()
var endpointID, sourceID, deviceID *string
if !util.IsStringEmpty(subscription.EndpointID) {
if _, ok := m.endpointIDs[subscription.EndpointID]; !ok {
continue
}
endpointID = &subscription.EndpointID
}
if !util.IsStringEmpty(subscription.SourceID) {
if _, ok := m.sourceIDs[subscription.SourceID]; !ok {
continue
}
sourceID = &subscription.SourceID
}
if !util.IsStringEmpty(subscription.DeviceID) {
continue // ignore cli subscriptions
}
values = append(values, map[string]interface{}{
"id": subscription.UID,
"name": subscription.Name,
"type": subscription.Type,
"project_id": subscription.ProjectID,
"endpoint_id": endpointID,
"device_id": deviceID,
"source_id": sourceID,
"alert_config_count": ac.Count,
"alert_config_threshold": ac.Threshold,
"retry_config_type": rc.Type,
"retry_config_duration": rc.Duration,
"retry_config_retry_count": rc.RetryCount,
"filter_config_event_types": fc.EventTypes,
"filter_config_filter_headers": fc.Filter.Headers,
"filter_config_filter_body": fc.Filter.Body,
"rate_limit_config_count": rlc.Count,
"rate_limit_config_duration": rlc.Duration,
"created_at": subscription.CreatedAt,
"updated_at": subscription.UpdatedAt,
"deleted_at": subscription.DeletedAt,
"function": subscription.Function,
})
}
if len(values) > 0 {
_, err := m.newDB.NamedExecContext(ctx, createSubscription, values)
if err != nil {
return err
}
}
return nil
}