Skip to content

Commit

Permalink
list records in pages rather than all at once
Browse files Browse the repository at this point in the history
fixes #23
  • Loading branch information
finn-block committed Feb 1, 2024
1 parent fca6f68 commit 7fc096b
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 97 deletions.
57 changes: 37 additions & 20 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,31 +232,48 @@ func (s *PkarrService) addRecordToCache(id string, resp GetPkarrResponse) error

// TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records
func (s *PkarrService) republish() {
allRecords, err := s.db.ListRecords(context.Background())
if err != nil {
logrus.WithError(err).Error("failed to list record(s) for republishing")
return
}
if len(allRecords) == 0 {
logrus.Info("No records to republish")
return
}
logrus.Infof("Republishing [%d] record(s)", len(allRecords))
var nextPageToken []byte
var allRecords []pkarr.Record
var err error
errCnt := 0
for _, record := range allRecords {
put, err := recordToBEP44Put(record)
successCnt := 0
for {
allRecords, nextPageToken, err = s.db.ListRecords(context.Background(), nextPageToken, 1000)
if err != nil {
logrus.WithError(err).Error("failed to convert record to bep44 put")
errCnt++
continue
logrus.WithError(err).Error("failed to list record(s) for republishing")
return
}

if len(allRecords) == 0 {
logrus.Info("No records to republish")
return
}

logrus.WithField("record_count", len(allRecords)).Info("Republishing record")

for _, record := range allRecords {
put, err := recordToBEP44Put(record)
if err != nil {
logrus.WithError(err).Error("failed to convert record to bep44 put")
errCnt++
continue
}

if _, err = s.dht.Put(context.Background(), *put); err != nil {
logrus.WithError(err).Error("failed to republish record")
errCnt++
continue
}

successCnt++
}
if _, err = s.dht.Put(context.Background(), *put); err != nil {
logrus.WithError(err).Error("failed to republish record")
errCnt++
continue

if nextPageToken == nil {
break
}
}
logrus.Infof("Republishing complete. Successfully republished %d out of %d record(s)", len(allRecords)-errCnt, len(allRecords))

logrus.WithField("success", successCnt).WithField("errors", errCnt).Info("Republishing complete")
}

func recordToBEP44Put(record pkarr.Record) (*bep44.Put, error) {
Expand Down
70 changes: 68 additions & 2 deletions impl/pkg/storage/db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type boltdb struct {
db *bolt.DB
}

type boltRecord struct {
key, value []byte
}

// NewBolt creates a BoltDB-based implementation of storage.Storage
func NewBolt(path string) (*boltdb, error) {
if path == "" {
Expand Down Expand Up @@ -68,7 +72,7 @@ func (s *boltdb) ReadRecord(_ context.Context, id []byte) (*pkarr.Record, error)
}

// ListRecords lists all records in the storage
func (s *boltdb) ListRecords(_ context.Context) ([]pkarr.Record, error) {
func (s *boltdb) ListAllRecords(_ context.Context) ([]pkarr.Record, error) {
recordsMap, err := s.readAll(pkarrNamespace)
if err != nil {
return nil, err
Expand All @@ -91,6 +95,37 @@ func (s *boltdb) ListRecords(_ context.Context) ([]pkarr.Record, error) {
return records, nil
}

// ListRecords lists all records in the storage
func (s *boltdb) ListRecords(_ context.Context, nextPageToken []byte, pagesize int) ([]pkarr.Record, []byte, error) {
boltRecords, err := s.readSeveral(pkarrNamespace, nextPageToken, pagesize)
if err != nil {
return nil, nil, err
}

var records []pkarr.Record
for _, recordBytes := range boltRecords {
var encodedRecord base64PkarrRecord
if err = json.Unmarshal(recordBytes.value, &encodedRecord); err != nil {
return nil, nil, err
}

record, err := encodedRecord.Decode()
if err != nil {
return nil, nil, err
}

records = append(records, record)
}

if len(boltRecords) == pagesize {
nextPageToken = boltRecords[len(boltRecords)-1].key
} else {
nextPageToken = nil
}

return records, nextPageToken, nil
}

func (s *boltdb) Close() error {
return s.db.Close()
}
Expand Down Expand Up @@ -127,7 +162,7 @@ func (s *boltdb) readAll(namespace string) (map[string][]byte, error) {
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
logrus.Warnf("namespace[%s] does not exist", namespace)
logrus.WithField("namespace", namespace).Warn("namespace does not exist")
return nil
}
cursor := bucket.Cursor()
Expand All @@ -138,3 +173,34 @@ func (s *boltdb) readAll(namespace string) (map[string][]byte, error) {
})
return result, err
}

func (s *boltdb) readSeveral(namespace string, after []byte, count int) ([]boltRecord, error) {
var result []boltRecord
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
logrus.WithField("namespace", namespace).Warn("namespace does not exist")
return nil
}

cursor := bucket.Cursor()

var k []byte
var v []byte
if after != nil {
cursor.Seek(after)
k, v = cursor.Next()
} else {
k, v = cursor.First()
}

for ; k != nil; k, v = cursor.Next() {
result = append(result, boltRecord{key: k, value: v})
if len(result) >= count {
break
}
}
return nil
})
return result, err
}
2 changes: 1 addition & 1 deletion impl/pkg/storage/db/bolt/bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestPKARRStorage(t *testing.T) {
assert.Equal(t, record, *readRecord)

// list and confirm it's there
records, err := db.ListRecords(ctx)
records, _, err := db.ListRecords(ctx, nil, 10)
assert.NoError(t, err)
assert.NotEmpty(t, records)
assert.Equal(t, record, records[0])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- +goose Up
CREATE TABLE pkarr_records (
id SERIAL PRIMARY KEY,
key BYTEA UNIQUE NOT NULL,
value BYTEA NOT NULL,
sig BYTEA NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions impl/pkg/storage/db/postgres/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 41 additions & 2 deletions impl/pkg/storage/db/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (p postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, err
}, nil
}

func (p postgres) ListRecords(ctx context.Context) ([]pkarr.Record, error) {
func (p postgres) ListAllRecords(ctx context.Context) ([]pkarr.Record, error) {
queries, db, err := p.connect(ctx)
if err != nil {
return nil, err
}
defer db.Close(ctx)

rows, err := queries.ListRecords(ctx)
rows, err := queries.ListAllRecords(ctx)
if err != nil {
return nil, err
}
Expand All @@ -120,6 +120,45 @@ func (p postgres) ListRecords(ctx context.Context) ([]pkarr.Record, error) {
return records, nil
}

func (p postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]pkarr.Record, []byte, error) {
queries, db, err := p.connect(ctx)
if err != nil {
return nil, nil, err
}
defer db.Close(ctx)

var rows []PkarrRecord
if nextPageToken == nil {
rows, err = queries.ListRecordsFirstPage(ctx, int32(limit))
} else {
rows, err = queries.ListRecords(ctx, ListRecordsParams{
Key: nextPageToken,
Limit: int32(limit),
})
}
if err != nil {
return nil, nil, err
}

var records []pkarr.Record
for _, row := range rows {
records = append(records, pkarr.Record{
K: row.Key,
V: row.Value,
Sig: row.Sig,
Seq: row.Seq,
})
}

if len(rows) == limit {
nextPageToken = rows[len(rows)-1].Key
} else {
nextPageToken = nil
}

return records, nextPageToken, nil
}

func (p postgres) Close() error {
// no-op, postgres connection is closed after each request
return nil
Expand Down
75 changes: 71 additions & 4 deletions impl/pkg/storage/db/postgres/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion impl/pkg/storage/db/postgres/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,11 @@ INSERT INTO pkarr_records(key, value, sig, seq) VALUES($1, $2, $3, $4);
-- name: ReadRecord :one
SELECT * FROM pkarr_records WHERE key = $1 LIMIT 1;

-- name: ListRecords :many
-- name: ListAllRecords :many
SELECT * FROM pkarr_records;

-- name: ListRecords :many
SELECT * FROM pkarr_records WHERE id > (SELECT id FROM pkarr_records WHERE pkarr_records.key = $1) ORDER BY id ASC LIMIT $2;

-- name: ListRecordsFirstPage :many
SELECT * FROM pkarr_records ORDER BY id ASC LIMIT $1;
10 changes: 10 additions & 0 deletions impl/pkg/storage/pkarr/pkarr.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package pkarr

import (
"encoding/base64"
"fmt"
)

type Record struct {
// Up to an 1000 bytes
V []byte `json:"v" validate:"required"`
Expand All @@ -9,3 +14,8 @@ type Record struct {
Sig []byte `json:"sig" validate:"required"`
Seq int64 `json:"seq" validate:"required"`
}

func (r Record) String() string {
encoding := base64.RawURLEncoding
return fmt.Sprintf("pkarr.Record{K=%s V=%s Sig=%s Seq=%d}", encoding.EncodeToString(r.K), encoding.EncodeToString(r.V), encoding.EncodeToString(r.Sig), r.Seq)
}
Loading

0 comments on commit 7fc096b

Please sign in to comment.