-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: webhook module v1 #101
Conversation
922b374
to
f9663ba
Compare
f9663ba
to
991c51a
Compare
await this.deps.notify( | ||
notification.payload.url, | ||
notification.payload.event, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is where the webhook callback happens. further down you can see we try a backoff strategy if it fails
notify: (params: NotificationPayload) => void; | ||
postgres: DataSource; | ||
}; | ||
export class DepositStatusWebhook implements IWebhook { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the specific deposit status processor we care about, its wrapped in a higher level webhooks class in case we want to define different types
assert(webhook, "Webhook does not exist by type: ${event.type}"); | ||
return webhook; | ||
} | ||
write(event: EventType): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the indexer, you want to call this function to write your events
Signed-off-by: david <[email protected]>
991c51a
to
7c77181
Compare
logger: Logger; | ||
}; | ||
|
||
export function WebhookFactory(config: Config, deps: Dependencies) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main entry point as an indexer
hooks, | ||
notify: notifier.notify, | ||
}), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is where we instance and register each webhook event type, currently we only have 'DepositStatus'
we can see it has a callback notify
which queues it in the notifier, which persists notification state
}); | ||
}; | ||
|
||
public tick = async (now = Date.now()): Promise<void> => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will manually need to call notifier.tick on an interval somewhere in indexer code ( or whoever consumes this module)
|
||
export class WebhookNotifier extends BaseNotifier { | ||
constructor(deps: Omit<Dependencies, "notify">) { | ||
super({ ...deps, notify: post }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes a specific webhook caller, we could theoretically pass in an event emitter for testing rather than https post
@@ -0,0 +1,107 @@ | |||
import { createClient, RedisClientType } from "redis"; | |||
|
|||
export interface AsyncStore<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstract store, hopefully it wouldnt be too difficult to hook this up to postgres. will speed up development to have a common store interface
} | ||
} | ||
|
||
export class RedisStore<V> implements AsyncStore<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redis store if we want to use it
@@ -0,0 +1,17 @@ | |||
export async function post(url: string, data: any): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what we use to post to webhook
isDomainValid, | ||
"The base URL of the provided webhook does not match any of the client domains", | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was my attempt at gaurding a webhook for a domain, but this may need to be tested/rethought
app.use(bearerToken()); | ||
|
||
app.post( | ||
"/webhook", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integrator register a new webhook
Signed-off-by: david <[email protected]>
4547180
to
b36e367
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Left a few comments.
packages/webhooks/src/clients.ts
Outdated
} | ||
|
||
// This class is intended to store integration clients allowed to use the webhook service. | ||
export class WebhookClientManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I see this WebhookClientManager
is used as a repository to interact with a store/database, so it should be renamed to WebhookClientRepository
or WebhookClientMemoryRepository
and placed in a adapter/db
directory
id: ss.string(), | ||
}); | ||
|
||
export function WebhookRouter(deps: Dependencies): express.Router { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all HTTP related logic: routers, controllers, validation, types for query parameters, types for URL parameters, types for HTTP bodies, etc should be placed in a /entrypoint/http
directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following our conversation on Friday, the best for now would be to remove the express logic from the webhooks package and declare a /webhooks/register
endpoint in the indexer API that (1) saves the webhook request in the DB and (2) publishes a message on a queue like WebhookRequestRegistered
to inform the indexer that a request has been received and it needs to send the initial status to the webhook client.
P.S I asked Matt if the initial status should be sent to the webhook request URL or if we can attach it to the response body of the /register
endpoint. But must likely it will be option (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following our conversation on Friday, the best for now would be to remove the express logic from the webhooks package and declare a /webhooks/register endpoint in the indexer API that (1) saves the webhook request in the DB
This logic has been updated to remove references to express app, and only return a router, which is very portable. The reason i think we should at least return a router, is because there needs to be type validation on the routes, and you dont want the consumer of the webhook library to need to know the types the library expects to correctly validate.
P.S I asked Matt if the initial status should be sent to the webhook request URL or if we can attach it to the response body of the /register endpoint. But must likely it will be option (1)
would highly prefer all data updates go through webhook calls. it simplifies both our logic and consumer logic.
import { AsyncStore } from "./store"; | ||
import { Webhook } from "./types"; | ||
|
||
export class WebhookRequests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, this behaves as a repository class and it should be placed in the /adapter/db
directory with a more descriptive name: WebhookRequestRepository
. Also maybe we can make the file to have the same name as the exported class
packages/webhooks/src/notifier.ts
Outdated
public notify = (payload: NotificationPayload): void => { | ||
this.create(payload).catch((error) => { | ||
this.logger.error(`Error creating notification:`, error); | ||
}); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After creation, the notification should be sent right away, not scheduled to be sent later in a batch.
packages/webhooks/src/types.ts
Outdated
|
||
export type NotificationPayload = { | ||
url: string; | ||
event: JSONValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be named more generic, like "data" or smth. Not sure what event means in this context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event
makes me think about past tense verbs: DepositStatusChanged
, DepositFilled
, etc but here the notification will contain data { status, depositTxHash }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ive renamed this to "data"
packages/webhooks/src/notifier.ts
Outdated
this.pending = deps.pending; | ||
this.completed = deps.completed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it would simplify things for you, we can skip saving the notifications in a store and we will make them stateful later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed any stateful notifications
packages/webhooks/src/types.ts
Outdated
export interface Webhook { | ||
id: string; | ||
url: string; | ||
filter: string; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the language we used so far, I think WebhookRequests
would be a better name for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed this
Signed-off-by: david <[email protected]>
Signed-off-by: david <[email protected]>
public async unregisterClient(clientId: string): Promise<void> { | ||
if (!(await this.store.has(clientId))) { | ||
throw new Error(`Client with id ${clientId} does not exist.`); | ||
} | ||
await this.store.delete(clientId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public async unregisterClient(clientId: string): Promise<void> { | |
if (!(await this.store.has(clientId))) { | |
throw new Error(`Client with id ${clientId} does not exist.`); | |
} | |
await this.store.delete(clientId); | |
} | |
public unregisterClient(clientId: string): Promise<void> { | |
if (!(await this.store.has(clientId))) { | |
throw new Error(`Client with id ${clientId} does not exist.`); | |
} | |
return this.store.delete(clientId); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these need async, the stores are async cuz they will be calling a database
await this.store.delete(clientId); | ||
} | ||
|
||
public async getClient(clientId: string): Promise<WebhookClient | undefined> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public async getClient(clientId: string): Promise<WebhookClient | undefined> { | |
public getClient(clientId: string): WebhookClient | undefined { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of these functions may not need to be async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the store functions are all async, even though right now they are using a memory map, eventually will plugin a real database
} | ||
} | ||
|
||
async *entries(): AsyncIterableIterator<[string, V]> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC what is this *entries invocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its a generator which emits values asyncronously (async iterator). you can iterate over it like:
for await (const [id, client] of repository.entries()) {
console.log(`Client ID: ${id}, Client URL: ${client.url}`);
}
its kind of like a stream where the consumer is pulling the values in
Signed-off-by: david <[email protected]>
43272ef
to
aca4341
Compare
Motivation
We want to be able to handle webhook requests to the indexer, starting with deposit status
changes
This adds a module in the indexer to startup an express based webhook manager, with an interface to add new event types, as well as register many webhooks.
usage
factory
in the main entry point which gives you aWebhookFactory
call, which returns{webooks ,express ,notifier}
. As the indexer, all you need to do is call webhooks.write({type:'DepositStatus',event:EventData})You also need to setup an interval to run the notification worker by calling
notifier.tick()
which will try to send out all pending notifications to webhooksintegrators will call into the express app on
POST /webhook
, passing body ={type:'DepositStatus',filter:{originChainId,depositTxHash}, url:string}
. This will begin the listening process for indexer updates. It should also emit a first deposit status if one exists.testing
this has not been tested
TODO
In future prs: