From 37b3bbbefbd61610c01c3dd28be7a05ea6509074 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Wed, 16 Oct 2024 18:08:46 -0300 Subject: [PATCH] Add vanish subscriber --- cmd/crossposting-service/di/service.go | 8 ++ cmd/crossposting-service/di/wire.go | 5 + cmd/crossposting-service/di/wire_gen.go | 5 +- compose.yml | 36 ++++++ go.mod | 2 + go.sum | 8 ++ .../adapters/mocks/public_key_repository.go | 4 + .../adapters/sqlite/public_key_repository.go | 45 +++++++ .../sqlite/public_key_repository_test.go | 43 ++++++ service/app/app.go | 1 + service/app/vanish_subscriber.go | 122 ++++++++++++++++++ 11 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 compose.yml create mode 100644 service/app/vanish_subscriber.go diff --git a/cmd/crossposting-service/di/service.go b/cmd/crossposting-service/di/service.go index 45dac4d..dec0c17 100644 --- a/cmd/crossposting-service/di/service.go +++ b/cmd/crossposting-service/di/service.go @@ -24,6 +24,7 @@ type Service struct { migrationsRunner *migrations.Runner migrations migrations.Migrations migrationsProgressCallback migrations.ProgressCallback + vanishSubscriber *app.VanishSubscriber } func NewService( @@ -37,6 +38,7 @@ func NewService( migrationsRunner *migrations.Runner, migrations migrations.Migrations, migrationsProgressCallback migrations.ProgressCallback, + vanishSubscriber *app.VanishSubscriber, ) Service { return Service{ app: app, @@ -49,6 +51,7 @@ func NewService( migrationsRunner: migrationsRunner, migrations: migrations, migrationsProgressCallback: migrationsProgressCallback, + vanishSubscriber: vanishSubscriber, } } @@ -97,6 +100,11 @@ func (s Service) Run(ctx context.Context) error { errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error") }() + runners++ + go func() { + errCh <- errors.Wrap(s.vanishSubscriber.Run(ctx), "vanish subscriver error") + }() + var err error for i := 0; i < runners; i++ { err = multierror.Append(err, errors.Wrap(<-errCh, "error returned by runner")) diff --git a/cmd/crossposting-service/di/wire.go b/cmd/crossposting-service/di/wire.go index 92ef713..e55ac5c 100644 --- a/cmd/crossposting-service/di/wire.go +++ b/cmd/crossposting-service/di/wire.go @@ -27,6 +27,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) { applicationSet, sqliteAdaptersSet, downloaderSet, + vanishSubscriberSet, memoryPubsubSet, sqlitePubsubSet, loggingSet, @@ -116,6 +117,10 @@ var downloaderSet = wire.NewSet( app.NewDownloader, ) +var vanishSubscriberSet = wire.NewSet( + app.NewVanishSubscriber, +) + var tweetGeneratorSet = wire.NewSet( content.NewTransformer, domain.NewTweetGenerator, diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index b465366..0ff249b 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -115,7 +115,8 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S return Service{}, nil, err } loggingMigrationsProgressCallback := adapters.NewLoggingMigrationsProgressCallback(logger) - service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, metrics, runner, migrationsMigrations, loggingMigrationsProgressCallback) + vanishSubscriber := app.NewVanishSubscriber(genericTransactionProvider, logger) + service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, metrics, runner, migrationsMigrations, loggingMigrationsProgressCallback, vanishSubscriber) return service, func() { cleanup() }, nil @@ -306,4 +307,6 @@ type buildTransactionSqliteAdaptersDependencies struct { var downloaderSet = wire.NewSet(app.NewDownloader) +var vanishSubscriberSet = wire.NewSet(app.NewVanishSubscriber) + var tweetGeneratorSet = wire.NewSet(content.NewTransformer, domain.NewTweetGenerator, wire.Bind(new(app.TweetGenerator), new(*domain.TweetGenerator))) diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..1de690c --- /dev/null +++ b/compose.yml @@ -0,0 +1,36 @@ +services: + crossposting: + platform: linux/amd64 + build: + context: . + dockerfile: Dockerfile + environment: + CROSSPOSTING_TWITTER_KEY: xxx + CROSSPOSTING_TWITTER_KEY_SECRET: xxx + CROSSPOSTING_DATABASE_PATH: /db/database.sqlite + CROSSPOSTING_ENVIRONMENT: DEVELOPMENT + CROSSPOSTING_PUBLIC_FACING_ADDRESS: http://localhost:8008/ + REDIS_URL: redis://redis:6379 + ports: + - 8008:8008 + volumes: + - ./db:/db + depends_on: + - redis + + redis: + platform: linux/amd64 + image: redis:alpine + ports: + - 6379:6379 + + relay: + platform: linux/amd64 + image: ghcr.io/planetary-social/nosrelay:latest + ports: + - "7777:7777" + environment: + - RELAY_URL=wss://example.com + - REDIS_URL=redis://redis:6379 + depends_on: + - redis \ No newline at end of file diff --git a/go.mod b/go.mod index 9f31286..82f5800 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/nbd-wtf/go-nostr v0.25.4 github.com/oklog/ulid/v2 v2.1.0 github.com/prometheus/client_golang v1.16.0 + github.com/redis/go-redis/v9 v9.6.2 github.com/rs/zerolog v1.29.1 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 @@ -36,6 +37,7 @@ require ( github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b // indirect github.com/dghubble/sling v1.4.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect diff --git a/go.sum b/go.sum index b7088af..4e3633e 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,10 @@ github.com/boreq/errors v0.1.0 h1:aJIXv9JnyR5KtxFpQ8/AiblH3nfYmr1e1yoTze/5A1k= github.com/boreq/errors v0.1.0/go.mod h1:B3dsXzhYvfgUXp7ViU/moPYM4PojgQ9MiQ21uvY6qqQ= github.com/boreq/rest v0.1.0 h1:bAx31Rp1KrXHkCOlzqAtLKdh74xbly2SHkv9k3vX3iA= github.com/boreq/rest v0.1.0/go.mod h1:Ckfx0qLDdPbS081820aWkkqvwhlrbv0SDu8UBDY4k7w= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= github.com/btcsuite/btcd v0.23.0/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= @@ -61,6 +65,8 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= @@ -171,6 +177,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU= github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= +github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= +github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/service/adapters/mocks/public_key_repository.go b/service/adapters/mocks/public_key_repository.go index 2862d94..521bd64 100644 --- a/service/adapters/mocks/public_key_repository.go +++ b/service/adapters/mocks/public_key_repository.go @@ -21,6 +21,10 @@ func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey dom return errors.New("not implemented") } +func (m *PublicKeyRepository) DeleteByPublicKey(publicKey domain.PublicKey) error { + return errors.New("not implemented") +} + func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error) { return nil, errors.New("not implemented") } diff --git a/service/adapters/sqlite/public_key_repository.go b/service/adapters/sqlite/public_key_repository.go index 0ea4273..907ed59 100644 --- a/service/adapters/sqlite/public_key_repository.go +++ b/service/adapters/sqlite/public_key_repository.go @@ -49,6 +49,27 @@ WHERE account_id = $1 AND public_key = $2 return nil } +func (m *PublicKeyRepository) DeleteByPublicKey(publicKey domain.PublicKey) error { + var accountID string + row := m.tx.QueryRow(` + SELECT account_id FROM public_keys WHERE public_key = $1 + `, publicKey.Hex()) + + err := row.Scan(&accountID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return errors.New("no account found with the provided public key") + } + return errors.Wrap(err, "error retrieving account_id") + } + + if err := m.deleteAccountData(accountID); err != nil { + return errors.Wrap(err, "error deleting account-related data") + } + + return nil +} + func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error) { rows, err := m.tx.Query(` SELECT account_id, public_key, created_at @@ -139,3 +160,27 @@ func (m *PublicKeyRepository) readPublicKey(row *sql.Rows) (*domain.LinkedPublic return domain.NewLinkedPublicKey(accountID, publicKey, createdAt) } + +func (m *PublicKeyRepository) deleteAccountData(accountID string) error { + _, err := m.tx.Exec(`DELETE FROM public_keys WHERE account_id = $1`, accountID) + if err != nil { + return errors.Wrap(err, "error deleting from public_keys") + } + + _, err = m.tx.Exec(`DELETE FROM sessions WHERE account_id = $1`, accountID) + if err != nil { + return errors.Wrap(err, "error deleting from sessions") + } + + _, err = m.tx.Exec(`DELETE FROM accounts WHERE account_id = $1`, accountID) + if err != nil { + return errors.Wrap(err, "error deleting from accounts") + } + + _, err = m.tx.Exec(`DELETE FROM user_tokens WHERE account_id = $1`, accountID) + if err != nil { + return errors.Wrap(err, "error deleting from user_tokens") + } + + return nil +} diff --git a/service/adapters/sqlite/public_key_repository_test.go b/service/adapters/sqlite/public_key_repository_test.go index 54cab5e..1b1e83e 100644 --- a/service/adapters/sqlite/public_key_repository_test.go +++ b/service/adapters/sqlite/public_key_repository_test.go @@ -7,6 +7,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/stretchr/testify/require" @@ -259,3 +260,45 @@ func TestPublicKeyRepository_CountCountsPublicKeys(t *testing.T) { }) require.NoError(t, err) } + +func TestPublicKeyRepository_DeleteByPublicKey(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + accountID := fixtures.SomeAccountID() + twitterID := fixtures.SomeTwitterID() + publicKey := fixtures.SomePublicKey() + + account, err := accounts.NewAccount(accountID, twitterID) + require.NoError(t, err) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.AccountRepository.Save(account) + require.NoError(t, err) + + linkedPublicKey, err := domain.NewLinkedPublicKey(accountID, publicKey, time.Now()) + require.NoError(t, err) + err = adapters.PublicKeyRepository.Save(linkedPublicKey) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.PublicKeyRepository.DeleteByPublicKey(publicKey) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + _, err := adapters.AccountRepository.GetByAccountID(accountID) + require.Error(t, err) + require.Equal(t, app.ErrAccountDoesNotExist, err) + + return nil + }) + require.NoError(t, err) +} diff --git a/service/app/app.go b/service/app/app.go index ac866b1..e026a9a 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -49,6 +49,7 @@ type SessionRepository interface { type PublicKeyRepository interface { Save(linkedPublicKey *domain.LinkedPublicKey) error Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error + DeleteByPublicKey(publicKey domain.PublicKey) error List() ([]*domain.LinkedPublicKey, error) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error) diff --git a/service/app/vanish_subscriber.go b/service/app/vanish_subscriber.go new file mode 100644 index 0000000..e19ced8 --- /dev/null +++ b/service/app/vanish_subscriber.go @@ -0,0 +1,122 @@ +package app + +import ( + "context" + "os" + "time" + + "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/redis/go-redis/v9" +) + +type VanishSubscriber struct { + rdb *redis.Client + transactionProvider TransactionProvider + logger logging.Logger +} + +func NewVanishSubscriber( + transactionProvider TransactionProvider, + logger logging.Logger, +) *VanishSubscriber { + log := logger.New("vanishSubscriber") + redisURL := os.Getenv("REDIS_URL") + + options, err := redis.ParseURL(redisURL) + if err != nil { + log.Error().Message("Error parsing REDIS_URL") + } + + rdb := redis.NewClient(options) + + return &VanishSubscriber{ + rdb: rdb, + transactionProvider: transactionProvider, + logger: log, + } +} + +// Processes messages from the vanish_requests stream and updates the last_id when done +func (f *VanishSubscriber) Run(ctx context.Context) error { + streamName := "vanish_requests" + lastProcessedIDKey := "vanish_requests:crossposting_service:last_id" + + lastProcessedID, err := f.rdb.Get(ctx, lastProcessedIDKey).Result() + if err == redis.Nil { + lastProcessedID = "0-0" + } else if err != nil { + f.logger.Error().Message("Error fetching last processed ID") + return err + } + + f.logger.Debug().WithField("lastProcessedID", lastProcessedID).Message("Starting VanishSubscriber") + + for { + select { + case <-ctx.Done(): + f.logger.Debug().Message("context canceled, shutting down VanishSubscriber") + return nil + + default: + streamEntries, err := f.rdb.XRead(ctx, &redis.XReadArgs{ + Streams: []string{streamName, lastProcessedID}, + Count: 1, + Block: 5 * time.Second, + }).Result() + + if err == redis.Nil { + // No new messages in the stream within the block time, continue the loop + continue + } else if err != nil { + f.logger.Error().Message("Error reading from stream") + return err + } + + for _, stream := range streamEntries { + for _, entry := range stream.Messages { + streamID := entry.ID + f.logger.Debug().WithField("streamId", streamID).Message("Processing stream ID") + + pubkey, err := domain.NewPublicKeyFromHex(entry.Values["pubkey"].(string)) + + if err != nil { + f.logger.Error().Message("Error parsing pubkey") + break + } + + err = f.removePubkeyInfo(ctx, pubkey) + if err != nil { + f.logger.Error().WithField("streamId", streamID).Message("Failed to process entry") + continue + } + + err = f.rdb.Set(ctx, lastProcessedIDKey, streamID, 0).Err() + if err != nil { + f.logger.Error().Message("Error saving last processed ID") + return err + } + + lastProcessedID = streamID + f.logger.Debug().WithField("lastProcessedID", lastProcessedID).Message("Updated last processed ID") + } + } + } + } +} + +// Deletes the public key +func (f *VanishSubscriber) removePubkeyInfo(ctx context.Context, pubkey domain.PublicKey) error { + err := f.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + return adapters.PublicKeys.DeleteByPublicKey(pubkey) + }) + + if err != nil { + f.logger.Error().WithField("pubkey", pubkey).Message("Failed to remove pubkey info") + return err + } + + f.logger.Debug().WithField("pubkey", pubkey).Message("Removed pubkey info") + + return nil +}