Skip to content

Commit

Permalink
[Off-chain] Simplify TxClient with EventsQueryClient (#330)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
bryanchriswhite and Olshansk authored Jan 22, 2024
1 parent a158ade commit a3a2c80
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 215 deletions.
37 changes: 17 additions & 20 deletions pkg/client/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,22 @@ func (blockEvent *cometBlockEvent) Hash() []byte {
return blockEvent.Block.LastBlockID.Hash.Bytes()
}

// newCometBlockEventFactoryFn is a factory function that returns a function
// that attempts to deserialize the given bytes into a comet block.
// If the resulting block has a height of zero, assume the event was not a block
// event and return an ErrUnmarshalBlockEvent error.
func newCometBlockEventFactoryFn() events.NewEventsFn[client.Block] {
return func(blockMsgBz []byte) (client.Block, error) {
blockMsg := new(cometBlockEvent)
if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil {
return nil, err
}

// The header height should never be zero. If it is, it means that blockMsg
// does not match the expected format which led unmarshaling to fail,
// and blockHeader.height to have a default value.
if blockMsg.Block.Header.Height == 0 {
return nil, events.ErrEventsUnmarshalEvent.
Wrapf("with block data: %s", string(blockMsgBz))
}

return blockMsg, nil
// newCometBlockEvent is a function that attempts to deserialize the given bytes
// into a comet block. If the resulting block has a height of zero, assume the event
// was not a block event and return an ErrUnmarshalBlockEvent error.
func newCometBlockEvent(blockMsgBz []byte) (client.Block, error) {
blockMsg := new(cometBlockEvent)
if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil {
return nil, err
}

// The header height should never be zero. If it is, it means that blockMsg
// does not match the expected format which led unmarshaling to fail,
// and blockHeader.height to have a default value.
if blockMsg.Block.Header.Height == 0 {
return nil, events.ErrEventsUnmarshalEvent.
Wrapf("with block data: %s", string(blockMsgBz))
}

return blockMsg, nil
}
9 changes: 3 additions & 6 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@ func NewBlockClient(
ctx context.Context,
deps depinject.Config,
) (client.BlockClient, error) {
client, err := events.NewEventsReplayClient[
client.Block,
client.EventsObservable[client.Block],
](
client, err := events.NewEventsReplayClient[client.Block](
ctx,
deps,
committedBlocksQuery,
newCometBlockEventFactoryFn(),
newCometBlockEvent,
defaultBlocksReplayLimit,
)
if err != nil {
Expand All @@ -60,7 +57,7 @@ type blockClient struct {
// and the BlockReplayObservable type as its generic types.
// These enable the EventsReplayClient to correctly map the raw event bytes
// to Block objects and to correctly return a BlockReplayObservable
eventsReplayClient client.EventsReplayClient[client.Block, client.EventsObservable[client.Block]]
eventsReplayClient client.EventsReplayClient[client.Block]
}

// CommittedBlocksSequence returns a replay observable of new block events.
Expand Down
7 changes: 2 additions & 5 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func NewDelegationClient(
ctx context.Context,
deps depinject.Config,
) (client.DelegationClient, error) {
client, err := events.NewEventsReplayClient[
client.Redelegation,
client.EventsObservable[client.Redelegation],
](
client, err := events.NewEventsReplayClient[client.Redelegation](
ctx,
deps,
delegationEventQuery,
Expand All @@ -71,7 +68,7 @@ type delegationClient struct {
// and the RedelegationReplayObservable type as its generic types.
// These enable the EventsReplayClient to correctly map the raw event bytes
// to Redelegation objects and to correctly return a RedelegationReplayObservable
eventsReplayClient client.EventsReplayClient[client.Redelegation, client.EventsObservable[client.Redelegation]]
eventsReplayClient client.EventsReplayClient[client.Redelegation]
}

// RedelegationsSequence returns a replay observable of Redelgation events
Expand Down
48 changes: 23 additions & 25 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,15 @@ const (
)

// Enforce the EventsReplayClient interface is implemented by the replayClient type.
var _ client.EventsReplayClient[
any,
observable.ReplayObservable[any],
] = (*replayClient[any, observable.ReplayObservable[any]])(nil)
var _ client.EventsReplayClient[any] = (*replayClient[any])(nil)

// NewEventsFn is a function that takes a byte slice and returns a new instance
// of the generic type T.
type NewEventsFn[T any] func([]byte) (T, error)

// replayClient implements the EventsReplayClient interface for a generic type T,
// and replay observable for type T.
type replayClient[T any, R observable.ReplayObservable[T]] struct {
type replayClient[T any] struct {
// queryString is the query string used to subscribe to events of the
// desired type.
// See: https://docs.cosmos.network/main/learn/advanced/events#subscribing-to-events
Expand All @@ -70,14 +67,15 @@ type replayClient[T any, R observable.ReplayObservable[T]] struct {
// goPublishEvents. This observable (and the one it emits) closes when the
// events bytes observable returns an error and is updated with a new
// "active" observable after a new events query subscription is created.
//
// TODO_REFACTOR(@h5law): Look into making this a regular observable as
// we no depend on it being replayable.
replayObsCache observable.ReplayObservable[R]
// we may no longer depend on it being replayable.
replayObsCache observable.ReplayObservable[observable.ReplayObservable[T]]
// replayObsCachePublishCh is the publish channel for replayObsCache.
// It's used to set and subsequently update replayObsCache the events replay
// observable;
// For example when the connection is re-established after erroring.
replayObsCachePublishCh chan<- R
replayObsCachePublishCh chan<- observable.ReplayObservable[T]
}

// NewEventsReplayClient creates a new EventsReplayClient from the given
Expand All @@ -89,28 +87,28 @@ type replayClient[T any, R observable.ReplayObservable[T]] struct {
//
// Required dependencies:
// - client.EventsQueryClient
func NewEventsReplayClient[T any, R observable.ReplayObservable[T]](
func NewEventsReplayClient[T any](
ctx context.Context,
deps depinject.Config,
queryString string,
newEventFn NewEventsFn[T],
replayObsBufferSize int,
) (client.EventsReplayClient[T, R], error) {
) (client.EventsReplayClient[T], error) {
// Initialize the replay client
rClient := &replayClient[T, R]{
rClient := &replayClient[T]{
queryString: queryString,
eventDecoder: newEventFn,
replayObsBufferSize: replayObsBufferSize,
}
// TODO_REFACTOR(@h5law): Look into making this a regular observable as
// we no depend on it being replayable.
replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[R](
// we may no longer depend on it being replayable.
replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[observable.ReplayObservable[T]](
ctx,
// Buffer size of 1 as the cache only needs to hold the latest
// active replay observable.
replayObsCacheBufferSize,
)
rClient.replayObsCache = observable.ReplayObservable[R](replayObsCache)
rClient.replayObsCache = replayObsCache
rClient.replayObsCachePublishCh = replayObsCachePublishCh

// Inject dependencies
Expand All @@ -127,7 +125,7 @@ func NewEventsReplayClient[T any, R observable.ReplayObservable[T]](
// EventsSequence returns a new ReplayObservable, with the buffer size provided
// during the EventsReplayClient construction, which is notified when new
// events are received by the encapsulated EventsQueryClient.
func (rClient *replayClient[T, R]) EventsSequence(ctx context.Context) R {
func (rClient *replayClient[T]) EventsSequence(ctx context.Context) observable.ReplayObservable[T] {
// Create a new replay observable and publish channel for event type T with
// a buffer size matching that provided during the EventsReplayClient
// construction.
Expand All @@ -141,17 +139,17 @@ func (rClient *replayClient[T, R]) EventsSequence(ctx context.Context) R {
go rClient.goRemapEventsSequence(ctx, replayEventTypeObsPublishCh)

// Return the event type observable.
return eventTypeObs.(R)
return eventTypeObs
}

// goRemapEventsSequence publishes events observed by the most recent cached
// events type replay observable to the given publishCh
func (rClient *replayClient[T, R]) goRemapEventsSequence(ctx context.Context, publishCh chan<- T) {
func (rClient *replayClient[T]) goRemapEventsSequence(ctx context.Context, publishCh chan<- T) {
var prevEventTypeObs observable.ReplayObservable[T]
channel.ForEach[R](
channel.ForEach[observable.ReplayObservable[T]](
ctx,
rClient.replayObsCache,
func(ctx context.Context, eventTypeObs R) {
func(ctx context.Context, eventTypeObs observable.ReplayObservable[T]) {
if prevEventTypeObs != nil {
// Just in case the assumption that all transport errors are
// persistent (i.e. they occur once and do not repeat) does not
Expand All @@ -173,13 +171,13 @@ func (rClient *replayClient[T, R]) goRemapEventsSequence(ctx context.Context, pu
// LastNEvents returns the last N typed events that have been received by the
// corresponding events query subscription.
// It blocks until at least one event has been received.
func (rClient *replayClient[T, R]) LastNEvents(ctx context.Context, n int) []T {
func (rClient *replayClient[T]) LastNEvents(ctx context.Context, n int) []T {
return rClient.EventsSequence(ctx).Last(ctx, n)
}

// Close unsubscribes all observers of the committed blocks sequence observable
// and closes the events query client.
func (rClient *replayClient[T, R]) Close() {
func (rClient *replayClient[T]) Close() {
// Closing eventsClient will cascade unsubscribe and close downstream observers.
rClient.eventsClient.Close()
}
Expand All @@ -188,7 +186,7 @@ func (rClient *replayClient[T, R]) Close() {
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (rClient *replayClient[T, R]) goPublishEvents(ctx context.Context) {
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
Expand All @@ -213,7 +211,7 @@ func (rClient *replayClient[T, R]) goPublishEvents(ctx context.Context) {
// to retry.OnError. The returned function pipes event bytes from the events
// query client, maps them to typed events, and publishes them to the
// replayObsCache replay observable.
func (rClient *replayClient[T, R]) retryPublishEventsFactory(ctx context.Context) func() chan error {
func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) func() chan error {
return func() chan error {
errCh := make(chan error, 1)
eventsBytesObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString)
Expand Down Expand Up @@ -247,7 +245,7 @@ func (rClient *replayClient[T, R]) retryPublishEventsFactory(ctx context.Context
}()

// Initially set replayObsCache and update if after retrying on error.
rClient.replayObsCachePublishCh <- typedObs.(R)
rClient.replayObsCachePublishCh <- typedObs

return errCh
}
Expand All @@ -267,7 +265,7 @@ func (rClient *replayClient[T, R]) retryPublishEventsFactory(ctx context.Context
// If deserialisation failed because the event bytes were for a different event
// type, this value is also skipped. If deserialisation failed for some other
// reason, this function panics.
func (rClient *replayClient[T, R]) newMapEventsBytesToTFn(errCh chan<- error) func(
func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
context.Context,
either.Bytes,
) (T, bool) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/client/events/replay_client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"cosmossdk.io/depinject"

"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/polylog"
)

Expand Down Expand Up @@ -73,10 +72,7 @@ func ExampleNewEventsReplayClient() {

// Create a new instance of the EventsReplayClient
// See: https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events/#NewEventsReplayClient
client, err := events.NewEventsReplayClient[
EventType,
observable.ReplayObservable[EventType],
](
client, err := events.NewEventsReplayClient[EventType](
ctx,
depConfig,
eventQueryString,
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/events/replay_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestReplayClient_Remapping(t *testing.T) {
deps := depinject.Supply(queryClient)

// Create the replay client
replayClient, err := events.NewEventsReplayClient[messageEvent, messageEventReplayObs](
replayClient, err := events.NewEventsReplayClient[messageEvent](
ctx,
deps,
"", // subscription query string
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ type EventsObservable[T any] observable.ReplayObservable[T]

// EventsReplayClient is an interface which provides notifications about newly received
// events as well as direct access to the latest event via some blockchain API.
type EventsReplayClient[T any, R observable.ReplayObservable[T]] interface {
type EventsReplayClient[T any] interface {
// EventsSequence returns an observable which emits new events.
EventsSequence(context.Context) R
EventsSequence(context.Context) observable.ReplayObservable[T]
// LastNEvents returns the latest N events that has been received.
LastNEvents(ctx context.Context, n int) []T
// Close unsubscribes all observers of the events sequence observable
Expand Down
Loading

0 comments on commit a3a2c80

Please sign in to comment.