Skip to content

Commit

Permalink
Define v2 api endpoint (#73)
Browse files Browse the repository at this point in the history
* add service versioning

* add v2 endpoints

* restructure to support versioning

* add v2 handlers

* add mock returns

* update service folder

* update handler and swagger

* resolve build error

* better naming

* resolve comments
  • Loading branch information
jeremy-babylonlabs authored Oct 22, 2024
1 parent 9122661 commit 5c37863
Show file tree
Hide file tree
Showing 162 changed files with 5,399 additions and 2,358 deletions.
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ run-unprocessed-events-replay-local:
--replay

generate-mock-interface:
cd internal/db && mockery --name=DBClient --output=../../tests/mocks --outpkg=mocks --filename=mock_db_client.go
cd internal/clients/ordinals && mockery --name=OrdinalsClientInterface --output=../../../tests/mocks --outpkg=mocks --filename=mock_ordinal_client.go
cd internal/shared/db/client && mockery --name=DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_db_client.go
cd internal/v1/db/client && mockery --name=V1DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_v1_db_client.go
cd internal/v2/db/client && mockery --name=V2DBClient --output=../../../../tests/mocks --outpkg=mocks --filename=mock_v2_db_client.go
cd internal/shared/http/clients/ordinals && mockery --name=OrdinalsClient --output=../../../../../tests/mocks --outpkg=mocks --filename=mock_ordinal_client.go

test:
./bin/local-startup.sh;
go test -v -cover -p 1 ./... -count=1


build-swagger:
swag init --parseDependency --parseInternal -d cmd/staking-api-service,internal/api,internal/types
swag init --parseDependency --parseInternal -d cmd/staking-api-service,internal/shared/api,internal/shared/types,internal/v1/api/handlers,internal/v2/api/handlers
39 changes: 24 additions & 15 deletions cmd/staking-api-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (

"github.com/babylonlabs-io/staking-api-service/cmd/staking-api-service/cli"
"github.com/babylonlabs-io/staking-api-service/cmd/staking-api-service/scripts"
"github.com/babylonlabs-io/staking-api-service/internal/api"
"github.com/babylonlabs-io/staking-api-service/internal/clients"
"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db/model"
"github.com/babylonlabs-io/staking-api-service/internal/observability/healthcheck"
"github.com/babylonlabs-io/staking-api-service/internal/observability/metrics"
"github.com/babylonlabs-io/staking-api-service/internal/queue"
"github.com/babylonlabs-io/staking-api-service/internal/services"
"github.com/babylonlabs-io/staking-api-service/internal/types"
"github.com/babylonlabs-io/staking-api-service/internal/shared/api"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclients "github.com/babylonlabs-io/staking-api-service/internal/shared/db/clients"
dbmodel "github.com/babylonlabs-io/staking-api-service/internal/shared/db/model"
"github.com/babylonlabs-io/staking-api-service/internal/shared/http/clients"
"github.com/babylonlabs-io/staking-api-service/internal/shared/observability/healthcheck"
"github.com/babylonlabs-io/staking-api-service/internal/shared/observability/metrics"
queueclients "github.com/babylonlabs-io/staking-api-service/internal/shared/queue/clients"
"github.com/babylonlabs-io/staking-api-service/internal/shared/services"
"github.com/babylonlabs-io/staking-api-service/internal/shared/types"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -63,24 +64,32 @@ func main() {
metricsPort := cfg.Metrics.GetMetricsPort()
metrics.Init(metricsPort)

err = model.Setup(ctx, cfg)
err = dbmodel.Setup(ctx, cfg)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking db model")
}

// initialize clients package which is used to interact with external services
clients := clients.New(cfg)
services, err := services.New(ctx, cfg, params, finalityProviders, clients)

dbClients, err := dbclients.New(ctx, cfg)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking db clients")
}

services, err := services.New(ctx, cfg, params, finalityProviders, clients, dbClients)
if err != nil {
log.Fatal().Err(err).Msg("error while setting up staking services layer")
}

// Start the event queue processing
queues := queue.New(cfg.Queue, services)
queueClients := queueclients.New(ctx, cfg.Queue, services)

// Check if the scripts flag is set
if cli.GetReplayFlag() {
log.Info().Msg("Replay flag is set. Starting replay of unprocessable messages.")
err := scripts.ReplayUnprocessableMessages(ctx, cfg, queues, services.DbClient)

err := scripts.ReplayUnprocessableMessages(ctx, cfg, queueClients, dbClients.SharedDBClient)
if err != nil {
log.Fatal().Err(err).Msg("error while replaying unprocessable messages")
}
Expand All @@ -94,9 +103,9 @@ func main() {
return
}

queues.StartReceivingMessages()
queueClients.StartReceivingMessages()

healthcheckErr := healthcheck.StartHealthCheckCron(ctx, queues, cfg.Server.HealthCheckInterval)
healthcheckErr := healthcheck.StartHealthCheckCron(ctx, queueClients, cfg.Server.HealthCheckInterval)
if healthcheckErr != nil {
log.Fatal().Err(healthcheckErr).Msg("error while starting health check cron")
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/staking-api-service/scripts/pubkey_address_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import (
"context"
"fmt"

"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db"
"github.com/babylonlabs-io/staking-api-service/internal/utils"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
"github.com/babylonlabs-io/staking-api-service/internal/shared/utils"
v1dbclient "github.com/babylonlabs-io/staking-api-service/internal/v1/db/client"
"github.com/rs/zerolog/log"
)

func BackfillPubkeyAddressesMappings(ctx context.Context, cfg *config.Config) error {
dbClient, err := db.New(ctx, cfg.Db)
client, err := dbclient.NewMongoClient(ctx, cfg.Db)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
v1dbClient, err := v1dbclient.New(ctx, client, cfg.Db)
if err != nil {
return fmt.Errorf("failed to create db client: %w", err)
}
pageToken := ""
var count int
for {
result, err := dbClient.ScanDelegationsPaginated(ctx, pageToken)
result, err := v1dbClient.ScanDelegationsPaginated(ctx, pageToken)
if err != nil {
return fmt.Errorf("failed to scan delegations: %w", err)
}
Expand All @@ -29,7 +34,7 @@ func BackfillPubkeyAddressesMappings(ctx context.Context, cfg *config.Config) er
if err != nil {
return fmt.Errorf("failed to derive btc addresses: %w", err)
}
if err := dbClient.InsertPkAddressMappings(
if err := v1dbClient.InsertPkAddressMappings(
ctx, delegation.StakerPkHex, addresses.Taproot,
addresses.NativeSegwitOdd, addresses.NativeSegwitEven,
); err != nil {
Expand Down
35 changes: 12 additions & 23 deletions cmd/staking-api-service/scripts/replay_unprocessed_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"errors"
"fmt"

"github.com/babylonlabs-io/staking-api-service/internal/config"
"github.com/babylonlabs-io/staking-api-service/internal/db"
"github.com/babylonlabs-io/staking-api-service/internal/queue"
"github.com/babylonlabs-io/staking-api-service/internal/shared/config"
dbclient "github.com/babylonlabs-io/staking-api-service/internal/shared/db/client"
queueclients "github.com/babylonlabs-io/staking-api-service/internal/shared/queue/clients"
queueClient "github.com/babylonlabs-io/staking-queue-client/client"
"github.com/rs/zerolog/log"
)
Expand All @@ -17,9 +17,7 @@ type GenericEvent struct {
EventType queueClient.EventType `json:"event_type"`
}

func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues *queue.Queues, db db.DBClient) (err error) {
fmt.Println("Starting to replay unprocessable messages...")

func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues *queueclients.QueueClients, db dbclient.DBClient) (err error) {
// Fetch unprocessable messages
unprocessableMessages, err := db.FindUnprocessableMessages(ctx)
if err != nil {
Expand All @@ -30,15 +28,12 @@ func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues
messageCount := len(unprocessableMessages)

// Inform the user of the number of unprocessable messages
fmt.Printf("There are %d unprocessable messages.\n", messageCount)
if messageCount == 0 {
return errors.New("no unprocessable messages to replay")
}

// Process each unprocessable message
for i, msg := range unprocessableMessages {
fmt.Printf("Processing message %d/%d: %s\n", i+1, messageCount, msg.MessageBody)

for _, msg := range unprocessableMessages {
var genericEvent GenericEvent
if err := json.Unmarshal([]byte(msg.MessageBody), &genericEvent); err != nil {
return errors.New("failed to unmarshal event message")
Expand All @@ -53,34 +48,28 @@ func ReplayUnprocessableMessages(ctx context.Context, cfg *config.Config, queues
if err := db.DeleteUnprocessableMessage(ctx, msg.Receipt); err != nil {
return errors.New("failed to delete unprocessable message")
}

fmt.Printf("Message %d/%d processed and deleted successfully.\n", i+1, messageCount)
}

log.Info().Msg("Reprocessing of unprocessable messages completed.")
fmt.Println("Reprocessing of unprocessable messages completed.")
return
}

// processEventMessage processes the event message based on its EventType.
func processEventMessage(ctx context.Context, queues *queue.Queues, event GenericEvent, messageBody string) error {
fmt.Printf("Sending message to the queue for event type: %v\n", event.EventType)

func processEventMessage(ctx context.Context, queues *queueclients.QueueClients, event GenericEvent, messageBody string) error {
switch event.EventType {
case queueClient.ActiveStakingEventType:
return queues.ActiveStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.ActiveStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.UnbondingStakingEventType:
return queues.UnbondingStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.UnbondingStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.WithdrawStakingEventType:
return queues.WithdrawStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.WithdrawStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.ExpiredStakingEventType:
return queues.ExpiredStakingQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.ExpiredStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.StatsEventType:
return queues.StatsQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.StatsQueueClient.SendMessage(ctx, messageBody)
case queueClient.BtcInfoEventType:
return queues.BtcInfoQueueClient.SendMessage(ctx, messageBody)
return queues.V1QueueClient.BtcInfoQueueClient.SendMessage(ctx, messageBody)
default:
fmt.Printf("Error: unknown event type: %v\n", event.EventType)
return fmt.Errorf("unknown event type: %v", event.EventType)
}
}
Loading

0 comments on commit 5c37863

Please sign in to comment.