diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 2f6ac6654..91864d276 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -888,10 +888,12 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID) firstEvent := string(core.SubOptsFirstEventNewest) + var subscriptionOptions *fftypes.JSONAny if listener.Options != nil { firstEvent = listener.Options.FirstEvent + subscriptionOptions = listener.Options.SubscriptionOptions } - result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi) + result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi, subscriptionOptions) if err != nil { return err } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 93e7fed1d..6615684bd 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1950,6 +1950,58 @@ func TestAddSubscription(t *testing.T) { assert.NoError(t, err) } +func TestAddSubscriptionGenericOptionsPassed(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + e.streamID = "es-1" + e.streams = &streamManager{ + client: e.client, + } + + sub := &core.ContractListener{ + Location: fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()), + Event: &core.FFISerializedEvent{ + FFIEventDefinition: fftypes.FFIEventDefinition{ + Name: "Changed", + Params: fftypes.FFIParams{ + { + Name: "value", + Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`), + }, + }, + }, + }, + Options: &core.ContractListenerOptions{ + FirstEvent: string(core.SubOptsFirstEventOldest), + SubscriptionOptions: fftypes.JSONAnyPtr(`{ "genericOption": "generic" }`), + }, + } + + httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, + func(req *http.Request) (*http.Response, error) { + + var reqBody subscription + if err := json.NewDecoder(req.Body).Decode(&reqBody); err != nil { + return httpmock.NewStringResponse(400, ""), err + } + + expectedOptions := sub.Options.SubscriptionOptions.JSONObject() + actualOptions := reqBody.Options.JSONObject() + + assert.Equal(t, expectedOptions, actualOptions) + + return httpmock.NewJsonResponse(200, &subscription{}) + }) + + err := e.AddContractListener(context.Background(), sub) + + assert.NoError(t, err) +} + func TestAddSubscriptionWithoutLocation(t *testing.T) { e, cancel := newTestEthereum() defer cancel() diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 8b1d505ab..10d324ffc 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -59,6 +59,7 @@ type subscription struct { EthCompatAddress string `json:"address,omitempty"` EthCompatEvent *abi.Entry `json:"event,omitempty"` Filters []fftypes.JSONAny `json:"filters"` + Options *fftypes.JSONAny `json:"options"` subscriptionCheckpoint } @@ -182,7 +183,7 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) { +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, options *fftypes.JSONAny) (*subscription, error) { // Map FireFly "firstEvent" values to Ethereum "fromBlock" values switch firstEvent { case string(core.SubOptsFirstEventOldest): @@ -195,6 +196,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati Stream: stream, FromBlock: firstEvent, EthCompatEvent: abi, + Options: options, } if location != nil { @@ -267,7 +269,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace name = v1Name } location := &Location{Address: instancePath} - if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil { + if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, nil); err != nil { return nil, err } log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) diff --git a/pkg/core/contract_listener.go b/pkg/core/contract_listener.go index fb3fb1241..656583fec 100644 --- a/pkg/core/contract_listener.go +++ b/pkg/core/contract_listener.go @@ -44,7 +44,8 @@ type ContractListenerWithStatus struct { Status interface{} `ffstruct:"ContractListenerWithStatus" json:"status,omitempty" ffexcludeinput:"true"` } type ContractListenerOptions struct { - FirstEvent string `ffstruct:"ContractListenerOptions" json:"firstEvent,omitempty"` + FirstEvent string `ffstruct:"ContractListenerOptions" json:"firstEvent,omitempty"` + SubscriptionOptions *fftypes.JSONAny `ffstruct:"ContractListenerOptions" json:"subscriptionOptions,omitempty"` } type ListenerStatusError struct {