From 013f0f50b693fcb94d8f5620876b10ab14155b0c Mon Sep 17 00:00:00 2001 From: boreq Date: Tue, 31 Oct 2023 16:45:04 +0900 Subject: [PATCH 01/10] Migrations --- .../di/inject_adapters.go | 2 + cmd/crossposting-service/di/wire_gen.go | 9 +- internal/fixtures/fixtures.go | 4 + migrations/migrations.go | 239 ++++++ migrations/migrations_test.go | 736 ++++++++++++++++++ service/adapters/sqlite/migrations_storage.go | 141 ++++ .../sqlite/migrations_storage_test.go | 98 +++ service/adapters/sqlite/sqlite.go | 1 + 8 files changed, 1228 insertions(+), 2 deletions(-) create mode 100644 migrations/migrations.go create mode 100644 migrations/migrations_test.go create mode 100644 service/adapters/sqlite/migrations_storage.go create mode 100644 service/adapters/sqlite/migrations_storage_test.go diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index ff1a388..d314141 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -37,6 +37,8 @@ var sqliteTestAdaptersSet = wire.NewSet( sqlite.NewMigrations, wire.Struct(new(buildTransactionSqliteAdaptersDependencies), "*"), + + sqlite.NewMigrationsStorage, ) var sqliteTxAdaptersSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 6568547..a194f47 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -9,8 +9,6 @@ package di import ( "context" "database/sql" - "testing" - "github.com/ThreeDotsLabs/watermill" "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" @@ -27,6 +25,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/ports/http/frontend" memorypubsub2 "github.com/planetary-social/nos-crossposting-service/service/ports/memorypubsub" "github.com/planetary-social/nos-crossposting-service/service/ports/sqlitepubsub" + "testing" ) // Injectors from wire.go: @@ -136,10 +135,16 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te return sqlite.TestedItems{}, nil, err } sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) + migrationsStorage, err := sqlite.NewMigrationsStorage(db) + if err != nil { + cleanup() + return sqlite.TestedItems{}, nil, err + } testedItems := sqlite.TestedItems{ TransactionProvider: genericTransactionProvider, Migrations: migrations, Subscriber: sqliteSubscriber, + MigrationsStorage: migrationsStorage, } return testedItems, func() { cleanup() diff --git a/internal/fixtures/fixtures.go b/internal/fixtures/fixtures.go index 20face6..deabb4a 100644 --- a/internal/fixtures/fixtures.go +++ b/internal/fixtures/fixtures.go @@ -120,6 +120,10 @@ func TestContext(t testing.TB) context.Context { return ctx } +func SomeError() error { + return fmt.Errorf("some error: %d", rand.Int()) +} + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randSeq(n int) string { diff --git a/migrations/migrations.go b/migrations/migrations.go new file mode 100644 index 0000000..ed3db97 --- /dev/null +++ b/migrations/migrations.go @@ -0,0 +1,239 @@ +package migrations + +import ( + "context" + "encoding/json" + "fmt" + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/logging" + + "github.com/boreq/errors" + "github.com/hashicorp/go-multierror" +) + +type State map[string]string + +type SaveStateFunc func(state State) error + +// MigrationFunc is executed with the previously saved state. If the migration +// func is executed for the first time then the saved state will be an empty +// map. State is saved by calling the provided function. If a migration function +// returns an error it will be executed again. If a function doesn't return an +// error it should not be executed again. +type MigrationFunc func(ctx context.Context, state State, saveStateFunc SaveStateFunc) error + +type ProgressCallback interface { + // OnRunning is only called when a migration is actually being executed. + OnRunning(migrationIndex int, migrationsCount int) + + // OnError is called when the migration process for one of the migrations + // fails with an error. + OnError(migrationIndex int, migrationsCount int, err error) + + // OnDone is always called once all migrations were successfully executed + // even if no migrations were run. + OnDone(migrationsCount int) +} + +type Migration struct { + name string + fn MigrationFunc +} + +func NewMigration(name string, fn MigrationFunc) (Migration, error) { + if name == "" { + return Migration{}, errors.New("name is an empty string") + } + if fn == nil { + return Migration{}, errors.New("function is nil") + } + return Migration{name: name, fn: fn}, nil +} + +func MustNewMigration(name string, fn MigrationFunc) Migration { + v, err := NewMigration(name, fn) + if err != nil { + panic(err) + } + return v +} + +func (m Migration) Name() string { + return m.name +} + +func (m Migration) Fn() MigrationFunc { + return m.fn +} + +func (m Migration) IsZero() bool { + return m.fn == nil +} + +type Migrations struct { + migrations []Migration +} + +func NewMigrations(migrations []Migration) (Migrations, error) { + names := internal.NewEmptySet[string]() + for _, migration := range migrations { + if migration.IsZero() { + return Migrations{}, errors.New("zero value of migration") + } + + if names.Contains(migration.Name()) { + return Migrations{}, fmt.Errorf("duplicate name '%s'", migration.Name()) + } + names.Put(migration.Name()) + } + + return Migrations{migrations: migrations}, nil +} + +func MustNewMigrations(migrations []Migration) Migrations { + v, err := NewMigrations(migrations) + if err != nil { + panic(err) + } + return v +} + +func (m Migrations) List() []Migration { + return m.migrations +} + +func (m Migrations) Count() int { + return len(m.migrations) +} + +type Status struct { + s string +} + +var ( + StatusFailed = Status{"failed"} + StatusFinished = Status{"finished"} +) + +var ( + ErrStateNotFound = errors.New("state not found") + ErrStatusNotFound = errors.New("status not found") +) + +type Storage interface { + // LoadState returns ErrStateNotFound if state has not been saved yet. + LoadState(name string) (State, error) + SaveState(name string, state State) error + + // LoadStatus returns ErrStatusNotFound if status has not been saved yet. + LoadStatus(name string) (Status, error) + SaveStatus(name string, status Status) error +} + +type Runner struct { + storage Storage + logger logging.Logger +} + +func NewRunner(storage Storage, logger logging.Logger) *Runner { + return &Runner{storage: storage, logger: logger.New("migrations_runner")} +} + +func (r Runner) Run(ctx context.Context, migrations Migrations, callback ProgressCallback) error { + for i, migration := range migrations.List() { + onRunning := func() { + callback.OnRunning(i, migrations.Count()) + } + + if err := r.runMigration(ctx, migration, onRunning); err != nil { + err = errors.Wrapf(err, "error running migration '%s'", migration.Name()) + callback.OnError(i, migrations.Count(), err) + return err + } + } + + callback.OnDone(migrations.Count()) + + return nil +} + +func (r Runner) runMigration(ctx context.Context, migration Migration, onRunning func()) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + logger := r.logger.WithField("migration_name", migration.Name()) + + logger.Trace().Message("considering migration") + + shouldRun, err := r.shouldRun(migration) + if err != nil { + return errors.Wrap(err, "error checking if migration should be run") + } + + if !shouldRun { + logger.Debug().Message("not running this migration") + return nil + } + + onRunning() + + state, err := r.loadState(migration) + if err != nil { + return errors.Wrap(err, "error loading state") + } + + humanReadableState, err := json.Marshal(state) + if err != nil { + return errors.Wrap(err, "state json marshal error") + } + + logger.Debug().WithField("state", string(humanReadableState)).Message("executing migration") + + saveStateFunc := func(state State) error { + return r.storage.SaveState(migration.Name(), state) + } + + migrationErr := migration.Fn()(ctx, state, saveStateFunc) + saveStateErr := r.storage.SaveStatus(migration.Name(), r.statusFromError(migrationErr)) + + var resultErr error + + if migrationErr != nil { + resultErr = multierror.Append(resultErr, errors.Wrap(migrationErr, "migration function returned an error")) + } + + if saveStateErr != nil { + resultErr = multierror.Append(resultErr, errors.Wrap(saveStateErr, "error saving state")) + } + + return resultErr +} + +func (r Runner) shouldRun(migration Migration) (bool, error) { + status, err := r.storage.LoadStatus(migration.Name()) + if err != nil { + if errors.Is(err, ErrStatusNotFound) { + return true, nil + } + return false, errors.Wrap(err, "error loading status") + } + return status != StatusFinished, nil +} + +func (r Runner) loadState(migration Migration) (State, error) { + state, err := r.storage.LoadState(migration.Name()) + if err != nil { + if errors.Is(err, ErrStateNotFound) { + return make(State), nil + } + return nil, errors.Wrap(err, "error loading state") + } + return state, nil +} + +func (r Runner) statusFromError(err error) Status { + if err == nil { + return StatusFinished + } + return StatusFailed +} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go new file mode 100644 index 0000000..edd637d --- /dev/null +++ b/migrations/migrations_test.go @@ -0,0 +1,736 @@ +package migrations_test + +import ( + "context" + "fmt" + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/migrations" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRunner_MigrationsCanBeEmpty(t *testing.T) { + r := newTestRunner(t) + + ctx := + fixtures.TestContext(t) + m := migrations.MustNewMigrations(nil) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Empty(t, r.Storage.saveStateCalls) + require.Empty(t, r.Storage.saveStatusCalls) + require.Empty(t, r.Storage.loadStateCalls) + require.Empty(t, r.Storage.loadStatusCalls) +} + +func TestRunner_MigrationIsExecutedWithEmptyInitializedStateIfNoStateIsSaved(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + + var passedState *migrations.State + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + passedState = &state + return nil + }, + ), + }) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []loadStateCall{ + { + name: name, + }, + }, + r.Storage.loadStateCalls, + ) + require.EqualValues(t, internal.Pointer(make(migrations.State)), passedState) +} + +func TestRunner_MigrationIsExecutedWithSavedStateIfStateWasSaved(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + + var passedState *migrations.State + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + passedState = &state + return nil + }, + ), + }) + + someState := migrations.State{ + fixtures.SomeString(): fixtures.SomeString(), + } + + r.Storage.MockState(name, someState) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []loadStateCall{ + { + name: name, + }, + }, + r.Storage.loadStateCalls, + ) + require.EqualValues(t, internal.Pointer(someState), passedState) +} + +func TestRunner_MigrationIsExecutedIfNoStatusIsSaved(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + }) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []loadStateCall{ + { + name: name, + }, + }, + r.Storage.loadStateCalls, + ) + require.Equal(t, + []loadStatusCall{ + { + name: name, + }, + }, + r.Storage.loadStatusCalls, + ) +} + +func TestRunner_StatusIsSavedBasedOnReturnedErrors(t *testing.T) { + testCases := []struct { + Name string + ReturnedError error + ExpectedStatus migrations.Status + }{ + { + Name: "no_error", + ReturnedError: nil, + ExpectedStatus: migrations.StatusFinished, + }, + { + Name: "error", + ReturnedError: fixtures.SomeError(), + ExpectedStatus: migrations.StatusFailed, + }, + } + + for _, testCase := range testCases { + r := newTestRunner(t) + name := fixtures.SomeString() + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return testCase.ReturnedError + }, + ), + }) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + if testCase.ReturnedError == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, testCase.ReturnedError) + } + + require.Equal(t, + []saveStatusCall{ + { + name: name, + status: testCase.ExpectedStatus, + }, + }, + r.Storage.saveStatusCalls, + ) + } +} + +func TestRunner_MigrationIsNotExecutedIfItPreviouslySucceeded(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + }) + + r.Storage.MockStatus(name, migrations.StatusFinished) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Empty(t, r.Storage.loadStateCalls) + require.Equal(t, + []loadStatusCall{ + { + name: name, + }, + }, + r.Storage.loadStatusCalls, + ) +} + +func TestRunner_MigrationIsExecutedIfItPreviouslyFailed(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + }) + + r.Storage.MockStatus(name, migrations.StatusFailed) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []loadStateCall{ + { + name: name, + }, + }, + r.Storage.loadStateCalls, + ) + require.Equal(t, + []loadStatusCall{ + { + name: name, + }, + }, + r.Storage.loadStatusCalls, + ) +} + +func TestRunner_MigrationsAreConsideredInOrder(t *testing.T) { + r := newTestRunner(t) + + name1 := fixtures.SomeString() + name2 := fixtures.SomeString() + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name1, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + migrations.MustNewMigration( + name2, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + }) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []loadStateCall{ + { + name: name1, + }, + { + name: name2, + }, + }, + r.Storage.loadStateCalls, + ) + require.Equal(t, + []loadStatusCall{ + { + name: name1, + }, + { + name: name2, + }, + }, + r.Storage.loadStatusCalls, + ) +} + +func TestRunner_MigrationsCanSaveState(t *testing.T) { + r := newTestRunner(t) + + name := fixtures.SomeString() + someState := migrations.State{ + fixtures.SomeString(): fixtures.SomeString(), + } + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return saveStateFunc(someState) + }, + ), + }) + + ctx := fixtures.TestContext(t) + callback := newProgressCallbackMock() + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []saveStateCall{ + { + name: name, + state: someState, + }, + }, + r.Storage.saveStateCalls, + ) +} + +func TestRunner_IfMigrationsAreEmptyOnlyOnDoneProgressCallbackIsCalled(t *testing.T) { + r := newTestRunner(t) + + ctx := fixtures.TestContext(t) + m := migrations.MustNewMigrations(nil) + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Empty(t, callback.OnRunningCalls) + require.Empty(t, callback.OnErrorCalls) + require.Equal(t, + []progressCallbackMockOnDoneCall{ + { + MigrationsCount: 0, + }, + }, + callback.OnDoneCalls, + ) +} + +func TestRunner_OnRunningProgressCallbackIsCalledWhenMigrationsAreRun(t *testing.T) { + r := newTestRunner(t) + + ctx := fixtures.TestContext(t) + + noop := func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + } + + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration(fixtures.SomeString(), noop), + migrations.MustNewMigration(fixtures.SomeString(), noop), + }) + + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []progressCallbackMockOnRunningCall{ + { + MigrationIndex: 0, + MigrationsCount: 2, + }, + { + MigrationIndex: 1, + MigrationsCount: 2, + }, + }, + callback.OnRunningCalls, + ) + require.Empty(t, callback.OnErrorCalls) + require.Equal(t, + []progressCallbackMockOnDoneCall{ + { + MigrationsCount: 2, + }, + }, + callback.OnDoneCalls, + ) +} + +func TestRunner_OnRunningProgressCallbackIsOnlyCalledIfMigrationNeedsToBeRun(t *testing.T) { + r := newTestRunner(t) + + ctx := fixtures.TestContext(t) + + name1 := fixtures.SomeString() + name2 := fixtures.SomeString() + name3 := fixtures.SomeString() + + noop := func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + } + + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration(name1, noop), + migrations.MustNewMigration(name2, noop), + migrations.MustNewMigration(name3, noop), + }) + + r.Storage.MockStatus(name1, migrations.StatusFinished) + + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.NoError(t, err) + + require.Equal(t, + []progressCallbackMockOnRunningCall{ + { + MigrationIndex: 1, + MigrationsCount: 3, + }, + { + MigrationIndex: 2, + MigrationsCount: 3, + }, + }, + callback.OnRunningCalls, + ) + require.Empty(t, callback.OnErrorCalls) + require.Equal(t, + []progressCallbackMockOnDoneCall{ + { + MigrationsCount: 3, + }, + }, + callback.OnDoneCalls, + ) +} + +func TestRunner_BothOnProgressAndOnErrorProgressCallbacksAreCalledIfMigrationFails(t *testing.T) { + r := newTestRunner(t) + + ctx := fixtures.TestContext(t) + + name1 := fixtures.SomeString() + name2 := fixtures.SomeString() + someError := fixtures.SomeError() + + noop := func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + } + + returnsError := func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return someError + } + + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration(name1, noop), + migrations.MustNewMigration(name2, returnsError), + }) + + callback := newProgressCallbackMock() + + err := r.Runner.Run(ctx, m, callback) + require.Error(t, err) + + require.Equal(t, + []progressCallbackMockOnRunningCall{ + { + MigrationIndex: 0, + MigrationsCount: 2, + }, + { + MigrationIndex: 1, + MigrationsCount: 2, + }, + }, + callback.OnRunningCalls, + ) + require.Len(t, callback.OnErrorCalls, 1) + require.Equal(t, 1, callback.OnErrorCalls[0].MigrationIndex) + require.Equal(t, 2, callback.OnErrorCalls[0].MigrationsCount) + require.EqualError(t, + callback.OnErrorCalls[0].Err, + fmt.Sprintf( + "error running migration '%s': 1 error occurred:\n\t* migration function returned an error: %s\n\n", + name2, + someError.Error(), + ), + ) + require.Empty(t, callback.OnDoneCalls) +} + +func TestRunner_OnlyOnErrorProgressCallbackIsCalledIfStatusLoadingFails(t *testing.T) { + r := newTestRunner(t) + + ctx := fixtures.TestContext(t) + + name1 := fixtures.SomeString() + name2 := fixtures.SomeString() + someError := fixtures.SomeError() + + noop := func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + } + + m := migrations.MustNewMigrations([]migrations.Migration{ + migrations.MustNewMigration(name1, noop), + migrations.MustNewMigration(name2, noop), + }) + + callback := newProgressCallbackMock() + + r.Storage.MockLoadStatusError(name2, someError) + + err := r.Runner.Run(ctx, m, callback) + require.Error(t, err) + + require.Equal(t, + []progressCallbackMockOnRunningCall{ + { + MigrationIndex: 0, + MigrationsCount: 2, + }, + }, + callback.OnRunningCalls, + ) + require.Len(t, callback.OnErrorCalls, 1) + require.Equal(t, 1, callback.OnErrorCalls[0].MigrationIndex) + require.Equal(t, 2, callback.OnErrorCalls[0].MigrationsCount) + require.EqualError(t, + callback.OnErrorCalls[0].Err, + fmt.Sprintf( + "error running migration '%s': error checking if migration should be run: error loading status: %s", + name2, + someError.Error(), + ), + ) + require.Empty(t, callback.OnDoneCalls) +} + +type testRunner struct { + Runner *migrations.Runner + Storage *storageMock +} + +func newTestRunner(t *testing.T) testRunner { + logger := logging.NewDevNullLogger() + storage := newStorageMock() + runner := migrations.NewRunner(storage, logger) + + return testRunner{ + Runner: runner, + Storage: storage, + } +} + +type storageMock struct { + loadStateCalls []loadStateCall + loadStatusCalls []loadStatusCall + saveStateCalls []saveStateCall + saveStatusCalls []saveStatusCall + + returnedState map[string]migrations.State + returnedStatus map[string]migrations.Status + loadStatusErrors map[string]error +} + +func newStorageMock() *storageMock { + return &storageMock{ + returnedStatus: make(map[string]migrations.Status), + returnedState: make(map[string]migrations.State), + loadStatusErrors: make(map[string]error), + } +} + +func (s *storageMock) MockState(name string, state migrations.State) { + s.returnedState[name] = state +} + +func (s *storageMock) MockStatus(name string, status migrations.Status) { + s.returnedStatus[name] = status +} + +func (s *storageMock) MockLoadStatusError(name string, err error) { + s.loadStatusErrors[name] = err +} + +func (s *storageMock) LoadState(name string) (migrations.State, error) { + s.loadStateCalls = append(s.loadStateCalls, loadStateCall{name: name}) + state, ok := s.returnedState[name] + if !ok { + return nil, migrations.ErrStateNotFound + } + return state, nil +} + +func (s *storageMock) SaveState(name string, state migrations.State) error { + s.saveStateCalls = append(s.saveStateCalls, saveStateCall{name: name, state: state}) + return nil +} + +func (s *storageMock) LoadStatus(name string) (migrations.Status, error) { + s.loadStatusCalls = append(s.loadStatusCalls, loadStatusCall{name: name}) + + if err := s.loadStatusErrors[name]; err != nil { + return migrations.Status{}, err + } + + status, ok := s.returnedStatus[name] + if !ok { + return migrations.Status{}, migrations.ErrStatusNotFound + } + return status, nil +} + +func (s *storageMock) SaveStatus(name string, status migrations.Status) error { + s.saveStatusCalls = append(s.saveStatusCalls, saveStatusCall{name: name, status: status}) + return nil +} + +type loadStateCall struct { + name string +} + +type saveStateCall struct { + name string + state migrations.State +} + +type loadStatusCall struct { + name string +} + +type saveStatusCall struct { + name string + status migrations.Status +} + +func TestNewMigrations_DuplicateNamesAreNotAllowed(t *testing.T) { + name := "some name" + + _, err := migrations.NewMigrations( + []migrations.Migration{ + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + migrations.MustNewMigration( + name, + func(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { + return nil + }, + ), + }, + ) + require.EqualError(t, err, "duplicate name 'some name'") +} + +func TestNewMigrations_ZeroValuesOfMigrationsAreNotAllowed(t *testing.T) { + _, err := migrations.NewMigrations( + []migrations.Migration{ + {}, + }, + ) + require.EqualError(t, err, "zero value of migration") +} + +type progressCallbackMockOnRunningCall struct { + MigrationIndex int + MigrationsCount int +} + +type progressCallbackMockOnErrorCall struct { + MigrationIndex int + MigrationsCount int + Err error +} + +type progressCallbackMockOnDoneCall struct { + MigrationsCount int +} + +type progressCallbackMock struct { + OnRunningCalls []progressCallbackMockOnRunningCall + OnErrorCalls []progressCallbackMockOnErrorCall + OnDoneCalls []progressCallbackMockOnDoneCall +} + +func newProgressCallbackMock() *progressCallbackMock { + return &progressCallbackMock{} +} + +func (p *progressCallbackMock) OnRunning(migrationIndex int, migrationsCount int) { + p.OnRunningCalls = append(p.OnRunningCalls, progressCallbackMockOnRunningCall{ + MigrationIndex: migrationIndex, + MigrationsCount: migrationsCount, + }) +} + +func (p *progressCallbackMock) OnError(migrationIndex int, migrationsCount int, err error) { + p.OnErrorCalls = append(p.OnErrorCalls, progressCallbackMockOnErrorCall{ + MigrationIndex: migrationIndex, + MigrationsCount: migrationsCount, + Err: err, + }) +} + +func (p *progressCallbackMock) OnDone(migrationsCount int) { + p.OnDoneCalls = append(p.OnDoneCalls, progressCallbackMockOnDoneCall{ + MigrationsCount: migrationsCount, + }) +} diff --git a/service/adapters/sqlite/migrations_storage.go b/service/adapters/sqlite/migrations_storage.go new file mode 100644 index 0000000..2e6e832 --- /dev/null +++ b/service/adapters/sqlite/migrations_storage.go @@ -0,0 +1,141 @@ +package sqlite + +import ( + "database/sql" + "encoding/json" + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/migrations" +) + +const ( + migrationsBucket = "migrations" + migrationsBucketStatus = "status" + migrationsBucketState = "state" +) + +type MigrationsStorage struct { + db *sql.DB +} + +func NewMigrationsStorage(db *sql.DB) (*MigrationsStorage, error) { + if _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS migrations_state ( + name TEXT PRIMARY KEY, + state TEXT + );`, + ); err != nil { + return nil, errors.Wrap(err, "error creating the state table") + } + + if _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS migrations_status ( + name TEXT PRIMARY KEY, + status TEXT + );`, + ); err != nil { + return nil, errors.Wrap(err, "error creating the status table") + } + + return &MigrationsStorage{db: db}, nil +} + +func (b *MigrationsStorage) LoadState(name string) (migrations.State, error) { + var marshaledState string + + row := b.db.QueryRow("SELECT state FROM migrations_state WHERE name=$1", name) + if err := row.Scan(&marshaledState); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, migrations.ErrStateNotFound + } + return nil, errors.Wrap(err, "error querying the database") + } + + var state migrations.State + if err := json.Unmarshal([]byte(marshaledState), &state); err != nil { + return nil, errors.Wrap(err, "error unmarshaling state") + } + + return state, nil +} + +func (b *MigrationsStorage) SaveState(name string, state migrations.State) error { + marshaledState, err := json.Marshal(state) + if err != nil { + return errors.Wrap(err, "error marshaling state") + } + + if _, err := b.db.Exec(` +INSERT INTO migrations_state(name, state) +VALUES ($1, $2) +ON CONFLICT(name) DO UPDATE SET + state=excluded.state`, + name, marshaledState); err != nil { + return errors.Wrap(err, "error running the query") + } + + return nil +} + +func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error) { + var marshaledStatus string + + row := b.db.QueryRow("SELECT status FROM migrations_status WHERE name=$1", name) + if err := row.Scan(&marshaledStatus); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return migrations.Status{}, migrations.ErrStatusNotFound + } + return migrations.Status{}, errors.Wrap(err, "error querying the database") + } + + status, err := unmarshalStatus(marshaledStatus) + if err != nil { + return migrations.Status{}, errors.Wrap(err, "error unmarshaling status") + } + + return status, nil +} + +func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error { + marshaledStatus, err := marshalStatus(status) + if err != nil { + return errors.Wrap(err, "error marshaling status") + } + + if _, err := b.db.Exec(` +INSERT INTO migrations_status(name, status) +VALUES ($1, $2) +ON CONFLICT(name) DO UPDATE SET + status=excluded.status`, + name, marshaledStatus); err != nil { + return errors.Wrap(err, "error running the query") + } + + return nil +} + +const ( + statusFailed = "failed" + statusFinished = "finished" +) + +func marshalStatus(status migrations.Status) (string, error) { + switch status { + case migrations.StatusFailed: + return statusFailed, nil + case migrations.StatusFinished: + return statusFinished, nil + default: + return "", errors.New("unknown status") + } +} + +func unmarshalStatus(status string) (migrations.Status, error) { + switch status { + case statusFailed: + return migrations.StatusFailed, nil + case statusFinished: + return migrations.StatusFinished, nil + default: + return migrations.Status{}, errors.New("unknown status") + } +} diff --git a/service/adapters/sqlite/migrations_storage_test.go b/service/adapters/sqlite/migrations_storage_test.go new file mode 100644 index 0000000..068aa67 --- /dev/null +++ b/service/adapters/sqlite/migrations_storage_test.go @@ -0,0 +1,98 @@ +package sqlite_test + +import ( + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/migrations" + "github.com/stretchr/testify/require" + "testing" +) + +func TestMigrationsStorage_LoadStateReturnsCorrectErrorWhenStateIsNotAvailable(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + _, err := adapters.MigrationsStorage.LoadState(fixtures.SomeString()) + require.ErrorIs(t, err, migrations.ErrStateNotFound) +} + +func TestMigrationsStorage_LoadStateReturnsSavedState(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + name := fixtures.SomeString() + state := migrations.State{ + fixtures.SomeString(): fixtures.SomeString(), + } + + err := adapters.MigrationsStorage.SaveState(name, state) + require.NoError(t, err) + + loadedState, err := adapters.MigrationsStorage.LoadState(name) + require.NoError(t, err) + require.Equal(t, state, loadedState) +} + +func TestMigrationsStorage_SavingStateTwiceOverwritesPreviousState(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + name := fixtures.SomeString() + state1 := migrations.State{ + fixtures.SomeString(): fixtures.SomeString(), + } + state2 := migrations.State{ + fixtures.SomeString(): fixtures.SomeString(), + } + + err := adapters.MigrationsStorage.SaveState(name, state1) + require.NoError(t, err) + + err = adapters.MigrationsStorage.SaveState(name, state2) + require.NoError(t, err) + + loadedState, err := adapters.MigrationsStorage.LoadState(name) + require.NoError(t, err) + require.Equal(t, state2, loadedState) +} + +func TestMigrationsStorage_LoadStatusReturnsCorrectErrorWhenStatusIsNotAvailable(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + _, err := adapters.MigrationsStorage.LoadStatus(fixtures.SomeString()) + require.ErrorIs(t, err, migrations.ErrStatusNotFound) +} + +func TestMigrationsStorage_LoadStatusReturnsSavedStatus(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + name := fixtures.SomeString() + status := migrations.StatusFinished + + err := adapters.MigrationsStorage.SaveStatus(name, status) + require.NoError(t, err) + + loadedStatus, err := adapters.MigrationsStorage.LoadStatus(name) + require.NoError(t, err) + require.Equal(t, status, loadedStatus) +} + +func TestMigrationsStorage_SavingStatusTwiceOverwritesPreviousStatus(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + name := fixtures.SomeString() + status1 := migrations.StatusFinished + status2 := migrations.StatusFinished + + err := adapters.MigrationsStorage.SaveStatus(name, status1) + require.NoError(t, err) + + err = adapters.MigrationsStorage.SaveStatus(name, status2) + require.NoError(t, err) + + loadedStatus, err := adapters.MigrationsStorage.LoadStatus(name) + require.NoError(t, err) + require.Equal(t, status2, loadedStatus) +} diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index f40885d..db29515 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -24,6 +24,7 @@ type TestedItems struct { TransactionProvider *TestTransactionProvider Migrations *Migrations Subscriber *Subscriber + MigrationsStorage *MigrationsStorage } func Open(conf config.Config) (*sql.DB, error) { From c09bc7f454941c733eab9be740d1b74c101f5780 Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 05:58:24 +0900 Subject: [PATCH 02/10] Pubsub --- .../di/inject_adapters.go | 2 + cmd/crossposting-service/di/wire_gen.go | 7 +- internal/fixtures/fixtures.go | 12 + service/adapters/sqlite/migrations.go | 9 + service/adapters/sqlite/migrations_storage.go | 6 - service/adapters/sqlite/pubsub.go | 253 ++++++++++++++++++ service/adapters/sqlite/pubsub_test.go | 138 ++++++++++ service/adapters/sqlite/sqlite.go | 1 + 8 files changed, 420 insertions(+), 8 deletions(-) create mode 100644 service/adapters/sqlite/pubsub.go create mode 100644 service/adapters/sqlite/pubsub_test.go diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index d314141..7369ee2 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -23,6 +23,7 @@ var sqliteAdaptersSet = wire.NewSet( newAdaptersFactoryFn, sqlite.NewMigrations, + sqlite.NewPubSub, wire.Struct(new(buildTransactionSqliteAdaptersDependencies), "*"), ) @@ -39,6 +40,7 @@ var sqliteTestAdaptersSet = wire.NewSet( wire.Struct(new(buildTransactionSqliteAdaptersDependencies), "*"), sqlite.NewMigrationsStorage, + sqlite.NewPubSub, ) var sqliteTxAdaptersSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index a194f47..307acff 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -100,7 +100,8 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S } sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, sqliteSubscriber, logger, prometheusPrometheus) - migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter) + pubSub := sqlite.NewPubSub(db, logger) + migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter, pubSub) service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, migrations) return service, func() { cleanup() @@ -128,7 +129,8 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te genericTransactionProvider := sqlite.NewTestTransactionProvider(db, genericAdaptersFactoryFn) sqliteSchema := sqlite.NewSqliteSchema() offsetsAdapter := sqlite.NewWatermillOffsetsAdapter() - migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter) + pubSub := sqlite.NewPubSub(db, logger) + migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter, pubSub) subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, sqliteSchema, offsetsAdapter) if err != nil { cleanup() @@ -145,6 +147,7 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te Migrations: migrations, Subscriber: sqliteSubscriber, MigrationsStorage: migrationsStorage, + PubSub: pubSub, } return testedItems, func() { cleanup() diff --git a/internal/fixtures/fixtures.go b/internal/fixtures/fixtures.go index deabb4a..47ccc73 100644 --- a/internal/fixtures/fixtures.go +++ b/internal/fixtures/fixtures.go @@ -97,6 +97,18 @@ func SomeHexBytesOfLen(l int) string { return hex.EncodeToString(b) } +func SomeBytesOfLen(l int) []byte { + b := make([]byte, l) + n, err := cryptorand.Read(b) + if n != len(b) { + panic("short read") + } + if err != nil { + panic(err) + } + return b +} + func SomeFile(t testing.TB) string { file, err := os.CreateTemp("", "nos-crossposting-test") if err != nil { diff --git a/service/adapters/sqlite/migrations.go b/service/adapters/sqlite/migrations.go index a453553..dfbb72b 100644 --- a/service/adapters/sqlite/migrations.go +++ b/service/adapters/sqlite/migrations.go @@ -12,17 +12,20 @@ type Migrations struct { db *sql.DB watermillSchemaAdapter watermillsql.SchemaAdapter watermilOffsetsAdapter watermillsql.OffsetsAdapter + pubsub *PubSub } func NewMigrations( db *sql.DB, watermillSchemaAdapter watermillsql.SchemaAdapter, watermillOffsetsAdapter watermillsql.OffsetsAdapter, + pubsub *PubSub, ) *Migrations { return &Migrations{ db: db, watermillSchemaAdapter: watermillSchemaAdapter, watermilOffsetsAdapter: watermillOffsetsAdapter, + pubsub: pubsub, } } @@ -99,5 +102,11 @@ func (m *Migrations) Execute(ctx context.Context) error { } } + for _, query := range m.pubsub.InitializingQueries() { + if _, err = m.db.Exec(query); err != nil { + return errors.Wrapf(err, "error initializing pubsub") + } + } + return nil } diff --git a/service/adapters/sqlite/migrations_storage.go b/service/adapters/sqlite/migrations_storage.go index 2e6e832..b253828 100644 --- a/service/adapters/sqlite/migrations_storage.go +++ b/service/adapters/sqlite/migrations_storage.go @@ -7,12 +7,6 @@ import ( "github.com/planetary-social/nos-crossposting-service/migrations" ) -const ( - migrationsBucket = "migrations" - migrationsBucketStatus = "status" - migrationsBucketState = "state" -) - type MigrationsStorage struct { db *sql.DB } diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go new file mode 100644 index 0000000..1a49b11 --- /dev/null +++ b/service/adapters/sqlite/pubsub.go @@ -0,0 +1,253 @@ +package sqlite + +import ( + "context" + "database/sql" + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/internal/logging" + "math" + "sync" + "time" +) + +type Message struct { + uuid string + payload []byte +} + +func NewMessage(uuid string, payload []byte) (Message, error) { + if uuid == "" { + return Message{}, errors.New("uuid can't be empty") + } + return Message{uuid: uuid, payload: payload}, nil +} + +func (m Message) UUID() string { + return m.uuid +} + +func (m Message) Payload() []byte { + return m.payload +} + +type ReceivedMessage struct { + Message + + lock sync.Mutex + state receivedMessageState + chAck chan struct{} + chNack chan struct{} +} + +func NewReceivedMessage(message Message) *ReceivedMessage { + return &ReceivedMessage{ + Message: message, + state: receivedMessageStateFresh, + chAck: make(chan struct{}), + chNack: make(chan struct{}), + } +} + +func (m *ReceivedMessage) Ack() error { + m.lock.Lock() + defer m.lock.Unlock() + + if m.state != receivedMessageStateFresh { + return errors.New("message was already acked or nacked") + } + + m.state = receivedMessageStateAcked + close(m.chAck) + return nil +} + +func (m *ReceivedMessage) Nack() error { + m.lock.Lock() + defer m.lock.Unlock() + + if m.state != receivedMessageStateFresh { + return errors.New("message was already acked or nacked") + } + + m.state = receivedMessageStateNacked + close(m.chNack) + return nil +} + +type receivedMessageState struct { + s string +} + +var ( + receivedMessageStateFresh = receivedMessageState{"fresh"} + receivedMessageStateAcked = receivedMessageState{"acked"} + receivedMessageStateNacked = receivedMessageState{"nacked"} +) + +type PubSub struct { + db *sql.DB + logger logging.Logger +} + +func NewPubSub(db *sql.DB, logger logging.Logger) *PubSub { + return &PubSub{db: db, logger: logger} +} + +func (p *PubSub) InitializingQueries() []string { + return []string{` + CREATE TABLE IF NOT EXISTS pubsub ( + topic TEXT NOT NULL, + uuid VARCHAR(36) NOT NULL PRIMARY KEY, + payload BLOB, + created_at INTEGER NOT NULL, + nack_count INTEGER NOT NULL, + backoff_until INTEGER + )`, + } +} + +func (p *PubSub) Publish(topic string, msg Message) error { + _, err := p.db.Exec( + "INSERT INTO pubsub VALUES (?, ?, ?, ?, ?, ?)", + topic, + msg.uuid, + msg.payload, + time.Now().Unix(), + 0, + nil, + ) + return err +} + +func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMessage { + ch := make(chan *ReceivedMessage) + go p.subscribe(ctx, topic, ch) + return ch +} + +func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedMessage) { + noMessagesCounter := 0 + + for { + p.logger.Debug().Message("reading message") + + msg, err := p.readMsg(topic) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + p.logger.Debug().Message("no rows") + + noMessagesCounter++ + backoff := getNoMessagesBackoff(noMessagesCounter) + + p.logger.Trace(). + WithField("duration", backoff). + Message("backing off reading messages") + + select { + case <-time.After(backoff): + continue + case <-ctx.Done(): + return + } + } + + noMessagesCounter = 0 + p.logger.Error().WithError(err).Message("error reading message") + continue + } + + noMessagesCounter = 0 + receivedMsg := NewReceivedMessage(msg) + + select { + case ch <- receivedMsg: + case <-ctx.Done(): + return + } + + select { + case <-receivedMsg.chAck: + if err := p.ack(receivedMsg.Message); err != nil { + p.logger.Error().WithError(err).Message("error acking a message") + } + case <-receivedMsg.chNack: + if err := p.nack(receivedMsg.Message); err != nil { + p.logger.Error().WithError(err).Message("error nacking a message") + } + case <-ctx.Done(): + return + } + } +} + +func (p *PubSub) readMsg(topic string) (Message, error) { + row := p.db.QueryRow( + "SELECT uuid, payload FROM pubsub WHERE topic = ? AND (backoff_until IS NULL OR backoff_until <= ?) ORDER BY RANDOM() LIMIT 1", + topic, + time.Now().Unix(), + ) + + var uuid string + var payload []byte + if err := row.Scan(&uuid, &payload); err != nil { + return Message{}, errors.Wrap(err, "row scan error") + } + + return NewMessage(uuid, payload) +} + +func (p *PubSub) ackOrNack(msg Message) *ReceivedMessage { + return NewReceivedMessage(msg) +} + +func (p *PubSub) ack(msg Message) error { + _, err := p.db.Exec( + "DELETE FROM pubsub WHERE uuid = ?", + msg.uuid, + ) + return err +} + +func (p *PubSub) nack(msg Message) error { + row := p.db.QueryRow( + "SELECT nack_count FROM pubsub WHERE uuid = ? LIMIT 1", + msg.uuid, + ) + + var nackCount int + if err := row.Scan(&nackCount); err != nil { + return errors.Wrap(err, "error calling scan") + } + + nackCount = nackCount + 1 + backoffDuration := getMessageErrorBackoff(nackCount) + backoffUntil := time.Now().Add(backoffDuration) + + p.logger.Trace(). + WithField("until", backoffUntil). + WithField("duration", backoffDuration). + Message("backing off a message") + + if _, err := p.db.Exec( + "UPDATE pubsub SET nack_count = ?, backoff_until = ? WHERE uuid = ?", + nackCount, + backoffUntil.Unix(), + msg.uuid, + ); err != nil { + return errors.Wrap(err, "error updating the message") + } + + return nil +} + +func getMessageErrorBackoff(nackCount int) time.Duration { + a := time.Duration(math.Pow(5, float64(nackCount-1))) * time.Second + b := 1 * time.Hour + return min(a, b) +} + +func getNoMessagesBackoff(tick int) time.Duration { + a := time.Duration(math.Pow(2, float64(tick-1))) * time.Second + b := 30 * time.Second + return min(a, b) +} diff --git a/service/adapters/sqlite/pubsub_test.go b/service/adapters/sqlite/pubsub_test.go new file mode 100644 index 0000000..f3f6320 --- /dev/null +++ b/service/adapters/sqlite/pubsub_test.go @@ -0,0 +1,138 @@ +package sqlite_test + +import ( + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sync" + "testing" + "time" +) + +func TestPubSub_PublishDoesNotReturnErrors(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + err = adapters.PubSub.Publish(fixtures.SomeString(), msg) + require.NoError(t, err) +} + +func TestPubSub_PublishingMessagesWithIdenticalUUIDsReturnsAnError(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + err = adapters.PubSub.Publish(fixtures.SomeString(), msg) + require.NoError(t, err) + + err = adapters.PubSub.Publish(fixtures.SomeString(), msg) + require.EqualError(t, err, "UNIQUE constraint failed: pubsub.uuid") +} + +func TestPubSub_NackedMessagesAreRetried(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + topic := fixtures.SomeString() + + err = adapters.PubSub.Publish(topic, msg) + require.NoError(t, err) + + var msgs []*sqlite.ReceivedMessage + var msgsLock sync.Mutex + + go func() { + for msg := range adapters.PubSub.Subscribe(ctx, topic) { + msgsLock.Lock() + msgs = append(msgs, msg) + msgsLock.Unlock() + err := msg.Nack() + require.NoError(t, err) + } + }() + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + msgsLock.Lock() + assert.GreaterOrEqual(collect, len(msgs), 2) + msgsLock.Unlock() + }, 10*time.Second, 100*time.Microsecond) +} + +func TestPubSub_AckedMessagesAreNotRetried(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + topic := fixtures.SomeString() + + err = adapters.PubSub.Publish(topic, msg) + require.NoError(t, err) + + var msgs []*sqlite.ReceivedMessage + var msgsLock sync.Mutex + + go func() { + for msg := range adapters.PubSub.Subscribe(ctx, topic) { + msgsLock.Lock() + msgs = append(msgs, msg) + msgsLock.Unlock() + err := msg.Ack() + require.NoError(t, err) + } + }() + + <-time.After(10 * time.Second) + msgsLock.Lock() + require.Len(t, msgs, 1) + msgsLock.Unlock() +} + +func TestPubSub_NotAckedOrNackedMessagesBlock(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + topic := fixtures.SomeString() + + err = adapters.PubSub.Publish(topic, msg) + require.NoError(t, err) + + var msgs []*sqlite.ReceivedMessage + var msgsLock sync.Mutex + + go func() { + for msg := range adapters.PubSub.Subscribe(ctx, topic) { + msgsLock.Lock() + msgs = append(msgs, msg) + msgsLock.Unlock() + } + }() + + <-time.After(10 * time.Second) + msgsLock.Lock() + require.Len(t, msgs, 1) + msgsLock.Unlock() +} diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index db29515..790f2ad 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -25,6 +25,7 @@ type TestedItems struct { Migrations *Migrations Subscriber *Subscriber MigrationsStorage *MigrationsStorage + PubSub *PubSub } func Open(conf config.Config) (*sql.DB, error) { From f161f9cbe63bcf7e1396d1450ebe0c92da67246f Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:42:03 +0900 Subject: [PATCH 03/10] Glue --- .../di/inject_adapters.go | 8 -- .../di/inject_migrations.go | 20 ++++ cmd/crossposting-service/di/inject_pubsub.go | 10 +- cmd/crossposting-service/di/service.go | 14 ++- cmd/crossposting-service/di/wire.go | 9 +- cmd/crossposting-service/di/wire_gen.go | 74 ++++++------ migrations/migrations.go | 4 +- migrations/migrations_test.go | 4 +- .../adapters/migrations_progress_callback.go | 32 ++++++ service/adapters/sqlite/migrations.go | 51 +++------ service/adapters/sqlite/migrations_storage.go | 1 + .../sqlite/migrations_storage_test.go | 3 +- service/adapters/sqlite/publisher.go | 20 ++-- service/adapters/sqlite/pubsub.go | 33 +++++- service/adapters/sqlite/pubsub_test.go | 106 +++++++++++++++--- service/adapters/sqlite/sqlite.go | 5 +- service/adapters/sqlite/subscriber.go | 40 +------ 17 files changed, 272 insertions(+), 162 deletions(-) create mode 100644 cmd/crossposting-service/di/inject_migrations.go create mode 100644 service/adapters/migrations_progress_callback.go diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index 7369ee2..106650b 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -22,9 +22,6 @@ var sqliteAdaptersSet = wire.NewSet( newAdaptersFactoryFn, - sqlite.NewMigrations, - sqlite.NewPubSub, - wire.Struct(new(buildTransactionSqliteAdaptersDependencies), "*"), ) @@ -35,12 +32,7 @@ var sqliteTestAdaptersSet = wire.NewSet( newTestAdaptersFactoryFn, - sqlite.NewMigrations, - wire.Struct(new(buildTransactionSqliteAdaptersDependencies), "*"), - - sqlite.NewMigrationsStorage, - sqlite.NewPubSub, ) var sqliteTxAdaptersSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/inject_migrations.go b/cmd/crossposting-service/di/inject_migrations.go new file mode 100644 index 0000000..1184f46 --- /dev/null +++ b/cmd/crossposting-service/di/inject_migrations.go @@ -0,0 +1,20 @@ +package di + +import ( + "github.com/google/wire" + "github.com/planetary-social/nos-crossposting-service/migrations" + "github.com/planetary-social/nos-crossposting-service/service/adapters" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" +) + +var migrationsAdaptersSet = wire.NewSet( + sqlite.NewMigrations, + sqlite.NewMigrationFns, + migrations.NewRunner, + + sqlite.NewMigrationsStorage, + wire.Bind(new(migrations.Storage), new(*sqlite.MigrationsStorage)), + + adapters.NewLoggingMigrationsProgressCallback, + wire.Bind(new(migrations.ProgressCallback), new(*adapters.LoggingMigrationsProgressCallback)), +) diff --git a/cmd/crossposting-service/di/inject_pubsub.go b/cmd/crossposting-service/di/inject_pubsub.go index c21369e..bfb99b4 100644 --- a/cmd/crossposting-service/di/inject_pubsub.go +++ b/cmd/crossposting-service/di/inject_pubsub.go @@ -1,7 +1,6 @@ package di import ( - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" @@ -16,13 +15,14 @@ var memoryPubsubSet = wire.NewSet( ) var sqlitePubsubSet = wire.NewSet( - sqlite.NewSqliteSchema, - wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)), + //sqlite.NewSqliteSchema, + //wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)), - sqlite.NewWatermillOffsetsAdapter, - sqlite.NewWatermillSubscriber, + //sqlite.NewWatermillOffsetsAdapter, + //sqlite.NewWatermillSubscriber, sqlitepubsubport.NewTweetCreatedEventSubscriber, sqlite.NewSubscriber, + sqlite.NewPubSub, ) var sqliteTxPubsubSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/service.go b/cmd/crossposting-service/di/service.go index 7d2cb90..94990f4 100644 --- a/cmd/crossposting-service/di/service.go +++ b/cmd/crossposting-service/di/service.go @@ -5,7 +5,7 @@ import ( "github.com/boreq/errors" "github.com/hashicorp/go-multierror" - "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/ports/http" "github.com/planetary-social/nos-crossposting-service/service/ports/memorypubsub" @@ -19,7 +19,9 @@ type Service struct { downloader *app.Downloader receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber tweetCreatedEventSubscriber *sqlitepubsub.TweetCreatedEventSubscriber - migrations *sqlite.Migrations + migrationsRunner *migrations.Runner + migrations migrations.Migrations + migrationsProgressCallback migrations.ProgressCallback } func NewService( @@ -29,7 +31,9 @@ func NewService( downloader *app.Downloader, receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber, tweetCreatedEventSubscriber *sqlitepubsub.TweetCreatedEventSubscriber, - migrations *sqlite.Migrations, + migrationsRunner *migrations.Runner, + migrations migrations.Migrations, + migrationsProgressCallback migrations.ProgressCallback, ) Service { return Service{ app: app, @@ -38,7 +42,9 @@ func NewService( downloader: downloader, receivedEventSubscriber: receivedEventSubscriber, tweetCreatedEventSubscriber: tweetCreatedEventSubscriber, + migrationsRunner: migrationsRunner, migrations: migrations, + migrationsProgressCallback: migrationsProgressCallback, } } @@ -47,7 +53,7 @@ func (s Service) App() app.Application { } func (s Service) ExecuteMigrations(ctx context.Context) error { - return s.migrations.Execute(ctx) + return s.migrationsRunner.Run(ctx, s.migrations, s.migrationsProgressCallback) } func (s Service) Run(ctx context.Context) error { diff --git a/cmd/crossposting-service/di/wire.go b/cmd/crossposting-service/di/wire.go index e7870a8..d9d81f9 100644 --- a/cmd/crossposting-service/di/wire.go +++ b/cmd/crossposting-service/di/wire.go @@ -8,7 +8,6 @@ import ( "database/sql" "testing" - "github.com/ThreeDotsLabs/watermill" "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -31,6 +30,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) { loggingSet, adaptersSet, tweetGeneratorSet, + migrationsAdaptersSet, ) return Service{}, nil, nil } @@ -43,6 +43,7 @@ func BuildTestAdapters(context.Context, testing.TB) (sqlite.TestedItems, func(), sqlitePubsubSet, loggingSet, newTestAdaptersConfig, + migrationsAdaptersSet, ) return sqlite.TestedItems{}, nil, nil } @@ -61,13 +62,13 @@ func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { } type buildTransactionSqliteAdaptersDependencies struct { - LoggerAdapter watermill.LoggerAdapter + Logger logging.Logger } func buildTransactionSqliteAdapters(*sql.DB, *sql.Tx, buildTransactionSqliteAdaptersDependencies) (app.Adapters, error) { wire.Build( wire.Struct(new(app.Adapters), "*"), - wire.FieldsOf(new(buildTransactionSqliteAdaptersDependencies), "LoggerAdapter"), + wire.FieldsOf(new(buildTransactionSqliteAdaptersDependencies), "Logger"), sqliteTxAdaptersSet, sqliteTxPubsubSet, @@ -79,7 +80,7 @@ func buildTransactionSqliteAdapters(*sql.DB, *sql.Tx, buildTransactionSqliteAdap func buildTestTransactionSqliteAdapters(*sql.DB, *sql.Tx, buildTransactionSqliteAdaptersDependencies) (sqlite.TestAdapters, error) { wire.Build( wire.Struct(new(sqlite.TestAdapters), "*"), - wire.FieldsOf(new(buildTransactionSqliteAdaptersDependencies), "LoggerAdapter"), + wire.FieldsOf(new(buildTransactionSqliteAdaptersDependencies), "Logger"), sqliteTxAdaptersSet, sqliteTxPubsubSet, diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 307acff..21b2b7b 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -9,10 +9,12 @@ package di import ( "context" "database/sql" - "github.com/ThreeDotsLabs/watermill" + "testing" + "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/planetary-social/nos-crossposting-service/service/adapters" "github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub" "github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus" @@ -25,7 +27,6 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/ports/http/frontend" memorypubsub2 "github.com/planetary-social/nos-crossposting-service/service/ports/memorypubsub" "github.com/planetary-social/nos-crossposting-service/service/ports/sqlitepubsub" - "testing" ) // Injectors from wire.go: @@ -39,9 +40,8 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S if err != nil { return Service{}, nil, err } - watermillAdapter := logging.NewWatermillAdapter(logger) diBuildTransactionSqliteAdaptersDependencies := buildTransactionSqliteAdaptersDependencies{ - LoggerAdapter: watermillAdapter, + Logger: logger, } genericAdaptersFactoryFn := newAdaptersFactoryFn(diBuildTransactionSqliteAdaptersDependencies) genericTransactionProvider := sqlite.NewTransactionProvider(db, genericAdaptersFactoryFn) @@ -91,18 +91,23 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S processReceivedEventHandler := app.NewProcessReceivedEventHandler(genericTransactionProvider, tweetGenerator, logger, prometheusPrometheus) receivedEventSubscriber := memorypubsub2.NewReceivedEventSubscriber(receivedEventPubSub, processReceivedEventHandler, logger) sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, logger, prometheusPrometheus) - sqliteSchema := sqlite.NewSqliteSchema() - offsetsAdapter := sqlite.NewWatermillOffsetsAdapter() - subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, sqliteSchema, offsetsAdapter) + pubSub := sqlite.NewPubSub(db, logger) + subscriber := sqlite.NewSubscriber(pubSub) + tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger, prometheusPrometheus) + migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { cleanup() return Service{}, nil, err } - sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) - tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, sqliteSubscriber, logger, prometheusPrometheus) - pubSub := sqlite.NewPubSub(db, logger) - migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter, pubSub) - service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, migrations) + runner := migrations.NewRunner(migrationsStorage, logger) + migrationFns := sqlite.NewMigrationFns(db, pubSub) + migrationsMigrations, err := sqlite.NewMigrations(migrationFns) + if err != nil { + cleanup() + return Service{}, nil, err + } + loggingMigrationsProgressCallback := adapters.NewLoggingMigrationsProgressCallback(logger) + service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, runner, migrationsMigrations, loggingMigrationsProgressCallback) return service, func() { cleanup() }, nil @@ -121,33 +126,32 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te if err != nil { return sqlite.TestedItems{}, nil, err } - watermillAdapter := logging.NewWatermillAdapter(logger) diBuildTransactionSqliteAdaptersDependencies := buildTransactionSqliteAdaptersDependencies{ - LoggerAdapter: watermillAdapter, + Logger: logger, } genericAdaptersFactoryFn := newTestAdaptersFactoryFn(diBuildTransactionSqliteAdaptersDependencies) genericTransactionProvider := sqlite.NewTestTransactionProvider(db, genericAdaptersFactoryFn) - sqliteSchema := sqlite.NewSqliteSchema() - offsetsAdapter := sqlite.NewWatermillOffsetsAdapter() pubSub := sqlite.NewPubSub(db, logger) - migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter, pubSub) - subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, sqliteSchema, offsetsAdapter) + subscriber := sqlite.NewSubscriber(pubSub) + migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { cleanup() return sqlite.TestedItems{}, nil, err } - sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) - migrationsStorage, err := sqlite.NewMigrationsStorage(db) + runner := migrations.NewRunner(migrationsStorage, logger) + migrationFns := sqlite.NewMigrationFns(db, pubSub) + migrationsMigrations, err := sqlite.NewMigrations(migrationFns) if err != nil { cleanup() return sqlite.TestedItems{}, nil, err } testedItems := sqlite.TestedItems{ TransactionProvider: genericTransactionProvider, - Migrations: migrations, - Subscriber: sqliteSubscriber, + Subscriber: subscriber, MigrationsStorage: migrationsStorage, PubSub: pubSub, + MigrationsRunner: runner, + Migrations: migrationsMigrations, } return testedItems, func() { cleanup() @@ -175,20 +179,16 @@ func buildTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransactionSq if err != nil { return app.Adapters{}, err } - loggerAdapter := diBuildTransactionSqliteAdaptersDependencies.LoggerAdapter - sqliteSchema := sqlite.NewSqliteSchema() - publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, sqliteSchema) - if err != nil { - return app.Adapters{}, err - } - sqlitePublisher := sqlite.NewPublisher(publisher) + logger := diBuildTransactionSqliteAdaptersDependencies.Logger + pubSub := sqlite.NewPubSub(db, logger) + publisher := sqlite.NewPublisher(pubSub, tx) appAdapters := app.Adapters{ Accounts: accountRepository, Sessions: sessionRepository, PublicKeys: publicKeyRepository, ProcessedEvents: processedEventRepository, UserTokens: userTokensRepository, - Publisher: sqlitePublisher, + Publisher: publisher, } return appAdapters, nil } @@ -214,20 +214,16 @@ func buildTestTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransacti if err != nil { return sqlite.TestAdapters{}, err } - loggerAdapter := diBuildTransactionSqliteAdaptersDependencies.LoggerAdapter - sqliteSchema := sqlite.NewSqliteSchema() - publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, sqliteSchema) - if err != nil { - return sqlite.TestAdapters{}, err - } - sqlitePublisher := sqlite.NewPublisher(publisher) + logger := diBuildTransactionSqliteAdaptersDependencies.Logger + pubSub := sqlite.NewPubSub(db, logger) + publisher := sqlite.NewPublisher(pubSub, tx) testAdapters := sqlite.TestAdapters{ SessionRepository: sessionRepository, AccountRepository: accountRepository, PublicKeyRepository: publicKeyRepository, ProcessedEventRepository: processedEventRepository, UserTokensRepository: userTokensRepository, - Publisher: sqlitePublisher, + Publisher: publisher, } return testAdapters, nil } @@ -239,7 +235,7 @@ func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { } type buildTransactionSqliteAdaptersDependencies struct { - LoggerAdapter watermill.LoggerAdapter + Logger logging.Logger } var downloaderSet = wire.NewSet(app.NewDownloader) diff --git a/migrations/migrations.go b/migrations/migrations.go index ed3db97..a119471 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "fmt" - "github.com/planetary-social/nos-crossposting-service/internal" - "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/boreq/errors" "github.com/hashicorp/go-multierror" + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/logging" ) type State map[string]string diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index edd637d..4cc6920 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -3,12 +3,12 @@ package migrations_test import ( "context" "fmt" + "testing" + "github.com/planetary-social/nos-crossposting-service/internal" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/migrations" - "testing" - "github.com/stretchr/testify/require" ) diff --git a/service/adapters/migrations_progress_callback.go b/service/adapters/migrations_progress_callback.go new file mode 100644 index 0000000..3d70f3a --- /dev/null +++ b/service/adapters/migrations_progress_callback.go @@ -0,0 +1,32 @@ +package adapters + +import "github.com/planetary-social/nos-crossposting-service/internal/logging" + +type LoggingMigrationsProgressCallback struct { + logger logging.Logger +} + +func NewLoggingMigrationsProgressCallback(logger logging.Logger) *LoggingMigrationsProgressCallback { + return &LoggingMigrationsProgressCallback{logger: logger.New("migrationProgressCallback")} +} + +func (l LoggingMigrationsProgressCallback) OnRunning(migrationIndex int, migrationsCount int) { + l.logger.Debug(). + WithField("index", migrationIndex). + WithField("count", migrationsCount). + Message("running") +} + +func (l LoggingMigrationsProgressCallback) OnError(migrationIndex int, migrationsCount int, err error) { + l.logger.Error(). + WithField("index", migrationIndex). + WithField("count", migrationsCount). + WithError(err). + Message("error") +} + +func (l LoggingMigrationsProgressCallback) OnDone(migrationsCount int) { + l.logger.Debug(). + WithField("count", migrationsCount). + Message("done") +} diff --git a/service/adapters/sqlite/migrations.go b/service/adapters/sqlite/migrations.go index dfbb72b..0d5cc65 100644 --- a/service/adapters/sqlite/migrations.go +++ b/service/adapters/sqlite/migrations.go @@ -4,32 +4,27 @@ import ( "context" "database/sql" - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/migrations" ) -type Migrations struct { - db *sql.DB - watermillSchemaAdapter watermillsql.SchemaAdapter - watermilOffsetsAdapter watermillsql.OffsetsAdapter - pubsub *PubSub +func NewMigrations(fns *MigrationFns) (migrations.Migrations, error) { + return migrations.NewMigrations([]migrations.Migration{ + migrations.MustNewMigration("initial", fns.Initial), + migrations.MustNewMigration("create_pubsub_tables", fns.CreatePubsubTables), + }) } -func NewMigrations( - db *sql.DB, - watermillSchemaAdapter watermillsql.SchemaAdapter, - watermillOffsetsAdapter watermillsql.OffsetsAdapter, - pubsub *PubSub, -) *Migrations { - return &Migrations{ - db: db, - watermillSchemaAdapter: watermillSchemaAdapter, - watermilOffsetsAdapter: watermillOffsetsAdapter, - pubsub: pubsub, - } +type MigrationFns struct { + db *sql.DB + pubsub *PubSub +} + +func NewMigrationFns(db *sql.DB, pubsub *PubSub) *MigrationFns { + return &MigrationFns{db: db, pubsub: pubsub} } -func (m *Migrations) Execute(ctx context.Context) error { +func (m *MigrationFns) Initial(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { _, err := m.db.Exec(` CREATE TABLE IF NOT EXISTS accounts ( account_id TEXT PRIMARY KEY, @@ -88,22 +83,12 @@ func (m *Migrations) Execute(ctx context.Context) error { return errors.Wrap(err, "error creating the user tokens table") } - for _, topic := range []string{TweetCreatedTopic} { - for _, query := range m.watermillSchemaAdapter.SchemaInitializingQueries(topic) { - if _, err = m.db.Exec(query); err != nil { - return errors.Wrapf(err, "error initializing watermill schema for topic '%s'", topic) - } - } - - for _, query := range m.watermilOffsetsAdapter.SchemaInitializingQueries(topic) { - if _, err = m.db.Exec(query); err != nil { - return errors.Wrapf(err, "error initializing watermill offsets for topic '%s'", topic) - } - } - } + return nil +} +func (m *MigrationFns) CreatePubsubTables(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error { for _, query := range m.pubsub.InitializingQueries() { - if _, err = m.db.Exec(query); err != nil { + if _, err := m.db.Exec(query); err != nil { return errors.Wrapf(err, "error initializing pubsub") } } diff --git a/service/adapters/sqlite/migrations_storage.go b/service/adapters/sqlite/migrations_storage.go index b253828..1d31634 100644 --- a/service/adapters/sqlite/migrations_storage.go +++ b/service/adapters/sqlite/migrations_storage.go @@ -3,6 +3,7 @@ package sqlite import ( "database/sql" "encoding/json" + "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/migrations" ) diff --git a/service/adapters/sqlite/migrations_storage_test.go b/service/adapters/sqlite/migrations_storage_test.go index 068aa67..5ab0d77 100644 --- a/service/adapters/sqlite/migrations_storage_test.go +++ b/service/adapters/sqlite/migrations_storage_test.go @@ -1,10 +1,11 @@ package sqlite_test import ( + "testing" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/stretchr/testify/require" - "testing" ) func TestMigrationsStorage_LoadStateReturnsCorrectErrorWhenStateIsNotAvailable(t *testing.T) { diff --git a/service/adapters/sqlite/publisher.go b/service/adapters/sqlite/publisher.go index 5b38f45..548d2be 100644 --- a/service/adapters/sqlite/publisher.go +++ b/service/adapters/sqlite/publisher.go @@ -1,12 +1,11 @@ package sqlite import ( + "database/sql" "encoding/json" - "github.com/ThreeDotsLabs/watermill" - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" "github.com/boreq/errors" + "github.com/oklog/ulid/v2" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" ) @@ -14,11 +13,12 @@ import ( const TweetCreatedTopic = "tweet_created" type Publisher struct { - watermillPublisher *watermillsql.Publisher + pubsub *PubSub + tx *sql.Tx } -func NewPublisher(watermillPublisher *watermillsql.Publisher) *Publisher { - return &Publisher{watermillPublisher: watermillPublisher} +func NewPublisher(pubsub *PubSub, tx *sql.Tx) *Publisher { + return &Publisher{pubsub: pubsub, tx: tx} } func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet domain.Tweet) error { @@ -34,8 +34,12 @@ func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet doma return errors.Wrap(err, "error marshaling the transport type") } - msg := message.NewMessage(watermill.NewULID(), payload) - return p.watermillPublisher.Publish(TweetCreatedTopic, msg) + msg, err := NewMessage(ulid.Make().String(), payload) + if err != nil { + return errors.Wrap(err, "error creating a message") + } + + return p.pubsub.PublishTx(p.tx, TweetCreatedTopic, msg) } type TweetCreatedEventTransport struct { diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go index 1a49b11..fac4db1 100644 --- a/service/adapters/sqlite/pubsub.go +++ b/service/adapters/sqlite/pubsub.go @@ -3,11 +3,12 @@ package sqlite import ( "context" "database/sql" - "github.com/boreq/errors" - "github.com/planetary-social/nos-crossposting-service/internal/logging" "math" "sync" "time" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/internal/logging" ) type Message struct { @@ -107,7 +108,15 @@ func (p *PubSub) InitializingQueries() []string { } func (p *PubSub) Publish(topic string, msg Message) error { - _, err := p.db.Exec( + return p.publish(p.db, topic, msg) +} + +func (p *PubSub) PublishTx(tx *sql.Tx, topic string, msg Message) error { + return p.publish(tx, topic, msg) +} + +func (p *PubSub) publish(e executor, topic string, msg Message) error { + _, err := e.Exec( "INSERT INTO pubsub VALUES (?, ?, ?, ?, ?, ?)", topic, msg.uuid, @@ -125,6 +134,20 @@ func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMe return ch } +func (p *PubSub) QueueLength(topic string) (int, error) { + row := p.db.QueryRow( + "SELECT COUNT(*) FROM pubsub WHERE topic = ?", + topic, + ) + + var count int + if err := row.Scan(&count); err != nil { + return 0, errors.Wrap(err, "row scan error") + } + + return count, nil +} + func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedMessage) { noMessagesCounter := 0 @@ -251,3 +274,7 @@ func getNoMessagesBackoff(tick int) time.Duration { b := 30 * time.Second return min(a, b) } + +type executor interface { + Exec(query string, args ...any) (sql.Result, error) +} diff --git a/service/adapters/sqlite/pubsub_test.go b/service/adapters/sqlite/pubsub_test.go index f3f6320..830e5a1 100644 --- a/service/adapters/sqlite/pubsub_test.go +++ b/service/adapters/sqlite/pubsub_test.go @@ -1,13 +1,15 @@ package sqlite_test import ( + "context" + "sync" + "testing" + "time" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "sync" - "testing" - "time" ) func TestPubSub_PublishDoesNotReturnErrors(t *testing.T) { @@ -73,7 +75,62 @@ func TestPubSub_NackedMessagesAreRetried(t *testing.T) { }, 10*time.Second, 100*time.Microsecond) } -func TestPubSub_AckedMessagesAreNotRetried(t *testing.T) { +func TestPubSub_MessageContainCorrectPayloadAndAckedMessagesAreNotRetried(t *testing.T) { + t.Parallel() + + testCases := []struct { + Name string + Payload []byte + }{ + { + Name: "nil", + Payload: nil, + }, + { + Name: "not_nil", + Payload: fixtures.SomeBytesOfLen(10), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + msg, err := sqlite.NewMessage(fixtures.SomeString(), testCase.Payload) + require.NoError(t, err) + + topic := fixtures.SomeString() + + err = adapters.PubSub.Publish(topic, msg) + require.NoError(t, err) + + var msgs []*sqlite.ReceivedMessage + var msgsLock sync.Mutex + + go func() { + for msg := range adapters.PubSub.Subscribe(ctx, topic) { + msgsLock.Lock() + msgs = append(msgs, msg) + msgsLock.Unlock() + err := msg.Ack() + require.NoError(t, err) + } + }() + + <-time.After(10 * time.Second) + msgsLock.Lock() + require.Len(t, msgs, 1) + require.Equal(t, msg.UUID(), msgs[0].UUID()) + require.Equal(t, msg.Payload(), msgs[0].Payload()) + msgsLock.Unlock() + }) + } +} + +func TestPubSub_NotAckedOrNackedMessagesBlock(t *testing.T) { t.Parallel() ctx := fixtures.TestContext(t) @@ -95,8 +152,6 @@ func TestPubSub_AckedMessagesAreNotRetried(t *testing.T) { msgsLock.Lock() msgs = append(msgs, msg) msgsLock.Unlock() - err := msg.Ack() - require.NoError(t, err) } }() @@ -106,33 +161,48 @@ func TestPubSub_AckedMessagesAreNotRetried(t *testing.T) { msgsLock.Unlock() } -func TestPubSub_NotAckedOrNackedMessagesBlock(t *testing.T) { +func TestPubSub_QueueLengthReportsNumberOfElementsInQueue(t *testing.T) { t.Parallel() ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - msg, err := sqlite.NewMessage(fixtures.SomeString(), nil) + msg1, err := sqlite.NewMessage(fixtures.SomeString(), nil) + require.NoError(t, err) + + msg2, err := sqlite.NewMessage(fixtures.SomeString(), nil) require.NoError(t, err) topic := fixtures.SomeString() - err = adapters.PubSub.Publish(topic, msg) + err = adapters.PubSub.Publish(topic, msg1) require.NoError(t, err) - var msgs []*sqlite.ReceivedMessage - var msgsLock sync.Mutex + n, err := adapters.PubSub.QueueLength(topic) + require.NoError(t, err) + require.Equal(t, 1, n) + + err = adapters.PubSub.Publish(topic, msg2) + require.NoError(t, err) + + n, err = adapters.PubSub.QueueLength(topic) + require.NoError(t, err) + require.Equal(t, 2, n) go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for msg := range adapters.PubSub.Subscribe(ctx, topic) { - msgsLock.Lock() - msgs = append(msgs, msg) - msgsLock.Unlock() + err := msg.Ack() + require.NoError(t, err) + return } }() - <-time.After(10 * time.Second) - msgsLock.Lock() - require.Len(t, msgs, 1) - msgsLock.Unlock() + require.EventuallyWithT(t, func(collect *assert.CollectT) { + n, err = adapters.PubSub.QueueLength(topic) + assert.NoError(t, err) + assert.Equal(t, 1, n) + }, 10*time.Second, 100*time.Millisecond) } diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index 790f2ad..7287a46 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -7,6 +7,7 @@ import ( "github.com/boreq/errors" "github.com/hashicorp/go-multierror" _ "github.com/mattn/go-sqlite3" + "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/config" ) @@ -22,10 +23,12 @@ type TestAdapters struct { type TestedItems struct { TransactionProvider *TestTransactionProvider - Migrations *Migrations Subscriber *Subscriber MigrationsStorage *MigrationsStorage PubSub *PubSub + + MigrationsRunner *migrations.Runner + Migrations migrations.Migrations } func Open(conf config.Config) (*sql.DB, error) { diff --git a/service/adapters/sqlite/subscriber.go b/service/adapters/sqlite/subscriber.go index 6dbd7da..564bbc0 100644 --- a/service/adapters/sqlite/subscriber.go +++ b/service/adapters/sqlite/subscriber.go @@ -2,52 +2,24 @@ package sqlite import ( "context" - "database/sql" - - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type Subscriber struct { - watermillSubscriber *watermillsql.Subscriber - offsetsAdapter watermillsql.OffsetsAdapter - schema SqliteSchema - db *sql.DB + pubsub *PubSub } func NewSubscriber( - watermillSubscriber *watermillsql.Subscriber, - offsetsAdapter watermillsql.OffsetsAdapter, - schema SqliteSchema, - db *sql.DB, + pubsub *PubSub, ) *Subscriber { return &Subscriber{ - watermillSubscriber: watermillSubscriber, - offsetsAdapter: offsetsAdapter, - schema: schema, - db: db, + pubsub: pubsub, } } -func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) (<-chan *message.Message, error) { - return s.watermillSubscriber.Subscribe(ctx, TweetCreatedTopic) +func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) <-chan *ReceivedMessage { + return s.pubsub.Subscribe(ctx, TweetCreatedTopic) } func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) { - offsetsQuery, offsetsQueryArgs := s.offsetsAdapter.NextOffsetQuery(TweetCreatedTopic, consumerGroupName) - - selectQuery := ` - SELECT COUNT(*) - FROM ` + s.schema.MessagesTable(TweetCreatedTopic) + ` - WHERE offset > (` + offsetsQuery + `)` - - row := s.db.QueryRowContext(ctx, selectQuery, offsetsQueryArgs...) - - var count int - if err := row.Scan(&count); err != nil { - return 0, errors.Wrap(err, "error calling row scan") - } - - return count, nil + return s.pubsub.QueueLength(TweetCreatedTopic) } From 7c59447d71baa138961254a1b66f4504a3db2b6e Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:47:27 +0900 Subject: [PATCH 04/10] Fix --- cmd/crossposting-service/di/wire_gen.go | 14 ++--- service/adapters/sqlite/pubsub_test.go | 4 +- service/adapters/sqlite/sqlite.go | 5 +- service/adapters/sqlite/sqlite_test.go | 2 +- service/adapters/sqlite/subscriber_test.go | 57 --------------------- service/ports/sqlitepubsub/tweet_created.go | 20 ++++---- 6 files changed, 24 insertions(+), 78 deletions(-) delete mode 100644 service/adapters/sqlite/subscriber_test.go diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 21b2b7b..5906550 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -145,13 +145,15 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te cleanup() return sqlite.TestedItems{}, nil, err } + loggingMigrationsProgressCallback := adapters.NewLoggingMigrationsProgressCallback(logger) testedItems := sqlite.TestedItems{ - TransactionProvider: genericTransactionProvider, - Subscriber: subscriber, - MigrationsStorage: migrationsStorage, - PubSub: pubSub, - MigrationsRunner: runner, - Migrations: migrationsMigrations, + TransactionProvider: genericTransactionProvider, + Subscriber: subscriber, + MigrationsStorage: migrationsStorage, + PubSub: pubSub, + MigrationsRunner: runner, + Migrations: migrationsMigrations, + MigrationsProgressCallback: loggingMigrationsProgressCallback, } return testedItems, func() { cleanup() diff --git a/service/adapters/sqlite/pubsub_test.go b/service/adapters/sqlite/pubsub_test.go index 830e5a1..d8fb097 100644 --- a/service/adapters/sqlite/pubsub_test.go +++ b/service/adapters/sqlite/pubsub_test.go @@ -92,7 +92,9 @@ func TestPubSub_MessageContainCorrectPayloadAndAckedMessagesAreNotRetried(t *tes }, } - for _, testCase := range testCases { + for i := range testCases { + testCase := testCases[i] + t.Run(testCase.Name, func(t *testing.T) { t.Parallel() diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index 7287a46..999fd97 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -27,8 +27,9 @@ type TestedItems struct { MigrationsStorage *MigrationsStorage PubSub *PubSub - MigrationsRunner *migrations.Runner - Migrations migrations.Migrations + MigrationsRunner *migrations.Runner + Migrations migrations.Migrations + MigrationsProgressCallback migrations.ProgressCallback } func Open(conf config.Config) (*sql.DB, error) { diff --git a/service/adapters/sqlite/sqlite_test.go b/service/adapters/sqlite/sqlite_test.go index 12417a4..481d2dd 100644 --- a/service/adapters/sqlite/sqlite_test.go +++ b/service/adapters/sqlite/sqlite_test.go @@ -15,7 +15,7 @@ func NewTestAdapters(ctx context.Context, tb testing.TB) sqlite.TestedItems { tb.Cleanup(f) - err = adapters.Migrations.Execute(ctx) + err = adapters.MigrationsRunner.Run(ctx, adapters.Migrations, adapters.MigrationsProgressCallback) require.NoError(tb, err) return adapters diff --git a/service/adapters/sqlite/subscriber_test.go b/service/adapters/sqlite/subscriber_test.go deleted file mode 100644 index 272c04c..0000000 --- a/service/adapters/sqlite/subscriber_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package sqlite_test - -import ( - "context" - "testing" - "time" - - "github.com/planetary-social/nos-crossposting-service/internal/fixtures" - "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" - "github.com/planetary-social/nos-crossposting-service/service/domain" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSubscriber_TweetCreatedQueueLength(t *testing.T) { - ctx := fixtures.TestContext(t) - adapters := NewTestAdapters(ctx, t) - - n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx) - require.NoError(t, err) - require.Equal(t, 0, n) - - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { - err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString())) - require.NoError(t, err) - - return nil - }) - require.NoError(t, err) - - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { - err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString())) - require.NoError(t, err) - - return nil - }) - require.NoError(t, err) - - n, err = adapters.Subscriber.TweetCreatedQueueLength(ctx) - require.NoError(t, err) - require.Equal(t, 2, n) - - go func() { - ch, err := adapters.Subscriber.SubscribeToTweetCreated(ctx) - require.NoError(t, err) - - for msg := range ch { - msg.Ack() - } - }() - - require.EventuallyWithT(t, func(t *assert.CollectT) { - n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx) - assert.NoError(t, err) - assert.Equal(t, 0, n) - }, 5*time.Second, 100*time.Millisecond) -} diff --git a/service/ports/sqlitepubsub/tweet_created.go b/service/ports/sqlitepubsub/tweet_created.go index 2ff1901..e5afd04 100644 --- a/service/ports/sqlitepubsub/tweet_created.go +++ b/service/ports/sqlitepubsub/tweet_created.go @@ -5,7 +5,6 @@ import ( "encoding/json" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" @@ -44,26 +43,25 @@ func NewTweetCreatedEventSubscriber( func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error { go s.reportMetricsLoop(ctx) - ch, err := s.subscriber.SubscribeToTweetCreated(ctx) - if err != nil { - return errors.Wrap(err, "error calling subscribe") - } - - for msg := range ch { + for msg := range s.subscriber.SubscribeToTweetCreated(ctx) { if err := s.handleMessage(ctx, msg); err != nil { s.logger.Error().WithError(err).Message("error handling a message") - msg.Nack() + if err := msg.Nack(); err != nil { + return errors.Wrap(err, "error nacking a message") + } } else { - msg.Ack() + if err := msg.Ack(); err != nil { + return errors.Wrap(err, "error acking a message") + } } } return errors.New("channel closed") } -func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *message.Message) error { +func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *sqlite.ReceivedMessage) error { var transport sqlite.TweetCreatedEventTransport - if err := json.Unmarshal(msg.Payload, &transport); err != nil { + if err := json.Unmarshal(msg.Payload(), &transport); err != nil { return errors.Wrap(err, "error unmarshaling") } From 82fe8cfc33b92ce9e34afaa6e448667ef81d79e3 Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:48:08 +0900 Subject: [PATCH 05/10] Fix --- service/adapters/sqlite/pubsub.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go index fac4db1..1435719 100644 --- a/service/adapters/sqlite/pubsub.go +++ b/service/adapters/sqlite/pubsub.go @@ -219,10 +219,6 @@ func (p *PubSub) readMsg(topic string) (Message, error) { return NewMessage(uuid, payload) } -func (p *PubSub) ackOrNack(msg Message) *ReceivedMessage { - return NewReceivedMessage(msg) -} - func (p *PubSub) ack(msg Message) error { _, err := p.db.Exec( "DELETE FROM pubsub WHERE uuid = ?", From 22d4ffaab8d5ae5793fdc4b5d26da3110ccc9d6c Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:48:35 +0900 Subject: [PATCH 06/10] Fix --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 32dab86..4349038 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/mattn/go-sqlite3 v1.14.17 github.com/nbd-wtf/go-nostr v0.18.10 github.com/oklog/ulid/v2 v2.1.0 - github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/rs/zerolog v1.29.1 github.com/sirupsen/logrus v1.9.3 @@ -53,6 +52,7 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect From 49a5350bcde105c99bf29547c570f59dd3d76c1b Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:50:09 +0900 Subject: [PATCH 07/10] Fix --- cmd/crossposting-service/di/inject_pubsub.go | 6 - service/adapters/sqlite/watermill.go | 210 ------------------- 2 files changed, 216 deletions(-) delete mode 100644 service/adapters/sqlite/watermill.go diff --git a/cmd/crossposting-service/di/inject_pubsub.go b/cmd/crossposting-service/di/inject_pubsub.go index bfb99b4..1acf527 100644 --- a/cmd/crossposting-service/di/inject_pubsub.go +++ b/cmd/crossposting-service/di/inject_pubsub.go @@ -15,18 +15,12 @@ var memoryPubsubSet = wire.NewSet( ) var sqlitePubsubSet = wire.NewSet( - //sqlite.NewSqliteSchema, - //wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)), - - //sqlite.NewWatermillOffsetsAdapter, - //sqlite.NewWatermillSubscriber, sqlitepubsubport.NewTweetCreatedEventSubscriber, sqlite.NewSubscriber, sqlite.NewPubSub, ) var sqliteTxPubsubSet = wire.NewSet( - sqlite.NewWatermillPublisher, sqlite.NewPublisher, wire.Bind(new(app.Publisher), new(*sqlite.Publisher)), ) diff --git a/service/adapters/sqlite/watermill.go b/service/adapters/sqlite/watermill.go deleted file mode 100644 index 45612a7..0000000 --- a/service/adapters/sqlite/watermill.go +++ /dev/null @@ -1,210 +0,0 @@ -package sqlite - -import ( - "database/sql" - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/ThreeDotsLabs/watermill" - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/boreq/errors" -) - -const ( - consumerGroupName = "main" -) - -func NewWatermillPublisher( - tx *sql.Tx, - logger watermill.LoggerAdapter, - schemaAdapter watermillsql.SchemaAdapter, -) (*watermillsql.Publisher, error) { - config := watermillsql.PublisherConfig{ - SchemaAdapter: schemaAdapter, - AutoInitializeSchema: false, - } - - return watermillsql.NewPublisher(tx, config, logger) -} - -func NewWatermillSubscriber( - db *sql.DB, - logger watermill.LoggerAdapter, - schemaAdapter watermillsql.SchemaAdapter, - offsetsAdapter watermillsql.OffsetsAdapter, -) (*watermillsql.Subscriber, error) { - config := watermillsql.SubscriberConfig{ - ConsumerGroup: consumerGroupName, - PollInterval: 30 * time.Second, - ResendInterval: 30 * time.Second, - RetryInterval: 30 * time.Second, - SchemaAdapter: schemaAdapter, - OffsetsAdapter: offsetsAdapter, - InitializeSchema: false, - } - - return watermillsql.NewSubscriber(db, config, logger) -} - -func NewWatermillOffsetsAdapter() watermillsql.OffsetsAdapter { - return SqliteOffsetsAdapter{ - GenerateMessagesOffsetsTableName: func(topic string) string { - return fmt.Sprintf("watermill_offsets_%s", topic) - }, - } -} - -type SqliteSchema struct { - GenerateMessagesTableName func(topic string) string - SubscribeBatchSize int -} - -func NewSqliteSchema() SqliteSchema { - return SqliteSchema{ - GenerateMessagesTableName: func(topic string) string { - return fmt.Sprintf("watermill_%s", topic) - }, - } -} - -func (s SqliteSchema) SchemaInitializingQueries(topic string) []string { - createMessagesTable := strings.Join([]string{ - "CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (", - "`offset` INTEGER NOT NULL PRIMARY KEY,", - "`uuid` VARCHAR(36) NOT NULL,", - "`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,", - "`payload` JSON DEFAULT NULL,", - "`metadata` JSON DEFAULT NULL", - ");", - }, "\n") - - return []string{createMessagesTable} -} - -func (s SqliteSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) { - insertQuery := fmt.Sprintf( - `INSERT INTO %s (uuid, payload, metadata) VALUES %s`, - s.MessagesTable(topic), - strings.TrimRight(strings.Repeat(`(?,?,?),`, len(msgs)), ","), - ) - - args, err := defaultInsertArgs(msgs) - if err != nil { - return "", nil, err - } - - return insertQuery, args, nil -} - -func (s SqliteSchema) batchSize() int { - if s.SubscribeBatchSize == 0 { - return 100 - } - - return s.SubscribeBatchSize -} - -func (s SqliteSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter watermillsql.OffsetsAdapter) (string, []interface{}) { - nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup) - selectQuery := ` - SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + ` - WHERE - offset > (` + nextOffsetQuery + `) - ORDER BY - offset ASC - LIMIT ` + fmt.Sprintf("%d", s.batchSize()) - - return selectQuery, nextOffsetArgs -} - -func (s SqliteSchema) UnmarshalMessage(row watermillsql.Scanner) (watermillsql.Row, error) { - r := watermillsql.Row{} - err := row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata) - if err != nil { - return watermillsql.Row{}, errors.Wrap(err, "could not scan message row") - } - - msg := message.NewMessage(string(r.UUID), r.Payload) - - if r.Metadata != nil { - err = json.Unmarshal(r.Metadata, &msg.Metadata) - if err != nil { - return watermillsql.Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON") - } - } - - r.Msg = msg - - return r, nil -} - -func (s SqliteSchema) MessagesTable(topic string) string { - if s.GenerateMessagesTableName != nil { - return s.GenerateMessagesTableName(topic) - } - return fmt.Sprintf("`watermill_%s`", topic) -} - -func (s SqliteSchema) SubscribeIsolationLevel() sql.IsolationLevel { - return sql.LevelSerializable -} - -type SqliteOffsetsAdapter struct { - GenerateMessagesOffsetsTableName func(topic string) string -} - -func (a SqliteOffsetsAdapter) SchemaInitializingQueries(topic string) []string { - return []string{` - CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` ( - consumer_group VARCHAR(255) NOT NULL, - offset_acked BIGINT, - offset_consumed BIGINT NOT NULL, - PRIMARY KEY(consumer_group) - )`} -} - -func (a SqliteOffsetsAdapter) AckMessageQuery(topic string, row watermillsql.Row, consumerGroup string) (string, []interface{}) { - ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, offset_acked, consumer_group) - VALUES (?, ?, ?) ON CONFLICT(consumer_group) DO UPDATE SET offset_consumed=excluded.offset_consumed, offset_acked=excluded.offset_acked` - return ackQuery, []interface{}{row.Offset, row.Offset, consumerGroup} -} - -func (a SqliteOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) { - return `SELECT COALESCE( - (SELECT offset_acked - FROM ` + a.MessagesOffsetsTable(topic) + ` - WHERE consumer_group=? - ), 0)`, - []interface{}{consumerGroup} -} - -func (a SqliteOffsetsAdapter) MessagesOffsetsTable(topic string) string { - if a.GenerateMessagesOffsetsTableName != nil { - return a.GenerateMessagesOffsetsTableName(topic) - } - return fmt.Sprintf("`watermill_offsets_%s`", topic) -} - -func (a SqliteOffsetsAdapter) ConsumedMessageQuery(topic string, row watermillsql.Row, consumerGroup string, consumerULID []byte) (string, []interface{}) { - // offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery. - ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, consumer_group) - VALUES (?, ?) ON CONFLICT(consumer_group) DO UPDATE SET offset_consumed=excluded.offset_consumed` - return ackQuery, []interface{}{row.Offset, consumerGroup} -} - -func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) { - var args []interface{} - for _, msg := range msgs { - metadata, err := json.Marshal(msg.Metadata) - if err != nil { - return nil, errors.Wrapf(err, "could not marshal metadata into JSON for message %s", msg.UUID) - } - - args = append(args, msg.UUID, []byte(msg.Payload), metadata) - } - - return args, nil -} From 8f41018e6892a8ac34fefcedd6affd63a2639951 Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 06:50:36 +0900 Subject: [PATCH 08/10] Fix --- go.mod | 3 +-- go.sum | 30 ------------------------------ 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 4349038..7e46e8a 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.21 require ( github.com/ThreeDotsLabs/watermill v1.3.1 - github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 github.com/boreq/errors v0.1.0 github.com/boreq/rest v0.1.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 @@ -52,12 +51,12 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/oklog/ulid v1.3.1 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/puzpuzpuz/xsync v1.5.2 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect diff --git a/go.sum b/go.sum index 4e644ae..fc67f27 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/ThreeDotsLabs/watermill v1.3.1 h1:Fm+K9soLPEO/N2U90OkdwoyrIm4X8YsCDPxmuyniOR8= github.com/ThreeDotsLabs/watermill v1.3.1/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg= -github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8AM1vYfrIDwWtqyJk= -github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -59,8 +57,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/g8rswimmer/go-twitter/v2 v2.1.5 h1:Uj9Yuof2UducrP4Xva7irnUJfB9354/VyUXKmc2D5gg= github.com/g8rswimmer/go-twitter/v2 v2.1.5/go.mod h1:/55xWb313KQs25X7oZrNSEwLQNkYHhPsDwFstc45vhc= -github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= -github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= @@ -107,22 +103,6 @@ github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= -github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= -github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= -github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= -github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= -github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= -github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY= -github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= -github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw= -github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -135,8 +115,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= -github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -165,7 +143,6 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -201,8 +178,6 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ= -golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20221106115401-f9659909a136 h1:Fq7F/w7MAa1KJ5bt2aJ62ihqp9HDcRuyILskkpIAurw= golang.org/x/exp v0.0.0-20221106115401-f9659909a136/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= @@ -237,17 +212,12 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From 527828c05ce77a6ce59e44821979ab1e80de8c8c Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 07:12:28 +0900 Subject: [PATCH 09/10] Fix error handling --- service/adapters/purple_pages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/adapters/purple_pages.go b/service/adapters/purple_pages.go index a7ab8f1..488b098 100644 --- a/service/adapters/purple_pages.go +++ b/service/adapters/purple_pages.go @@ -80,7 +80,7 @@ func (p *PurplePages) GetRelays(ctx context.Context, publicKey domain.PublicKey) for i := 0; i < 2; i++ { result := <-ch - if result.Err != nil { + if err := result.Err; err != nil { if errors.Is(err, ErrPurplePagesTimeout) { return nil, errors.Wrap(err, "one of the lookups timed out") } From 2cb397bf4c2ae7b5f7711725818b1a0b50d95d7c Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 07:13:01 +0900 Subject: [PATCH 10/10] No debug --- service/adapters/sqlite/pubsub.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go index 1435719..efa1adb 100644 --- a/service/adapters/sqlite/pubsub.go +++ b/service/adapters/sqlite/pubsub.go @@ -152,13 +152,9 @@ func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedM noMessagesCounter := 0 for { - p.logger.Debug().Message("reading message") - msg, err := p.readMsg(topic) if err != nil { if errors.Is(err, sql.ErrNoRows) { - p.logger.Debug().Message("no rows") - noMessagesCounter++ backoff := getNoMessagesBackoff(noMessagesCounter)