Postgres Eventide interface for go. Designed for use directly with the Postgres DB, not through Eventide.
For GoDoc documentation, click here
A writer is a function on a message store instance that can write messages as commands or events to the message store database.
import (
gms "github.com/blackhatbrigade/gomessagestore"
"context"
)
func Process(ctx context.Context, ms gms.MessageStore, msg gms.Message) error {
data := {}
packedData, err := gms.Pack(data)
newEvent := NewEvent()
// attempt to write the message to the message store. If an error occurs, return the error.
err = ms.Write(ctx, newEvent, gms.AtPosition(-1))
if err != nil {
return err
}
return nil
}
messageStore = gms.NewMessageStore(postgresDBInstance)
ctx, cancel := context.WithCancel(context.Background())
msg = someMessage
err := Process(ctx, messageStore, msg)
A subscriber is used to retrieve new messages from a specified category or stream. It should only subscribe to a single category or stream. If the specified stream/category has new messages that have not yet been sent to the subscriber, they will be sent in the next poll iteration.
Use the CreateSubscriber() function on a messageStore instance.
Some things to note:
subscriberOptions are set by injecting any of the following functions into the params of the CreateSubscriber function: SubscribeToEntityStream SubscribeToCommandStream SubscribeToCategory PollTime PollErrorDelay UpdatePositionEvery SubscribeBatchSize
See subscriber_options.go for more details on these functions.
In the example below, we set the category being subscribed to, as well as our batch size using the subscriber options functions.
import (
gms "github.com/blackhatbrigade/gomessagestore"
"github.com/blackhatbrigade/gomessagestore/uuid"
"context"
)
// Create a new messageStore instnace
messageStore := gms.NewMessageStore(postgresDB)
// Set up context for handling our routines
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create our subscriber
subscriber, error := messageStore.CreateSubscriber(
"subscriberID",
[]gms.MessageHandler{},
gms.SubscribeToCategory("categoryID"),
gms.SubscribeBatchSize(500),
)
if error != nil {
return error
}
// Run our subscriber; our handlers will take care of processing the incoming messages.
go subscriber.Start(ctx)
A projector allows you to take an initial state and update it by processing all of the messages in a specific stream in order to derive the current state of the stream.
Use the CreateProjector() function on a messageStore instance.
Some things to note:
ProjectorOptions are set by injecting the following functions into the params of the CreateProjector function: WithReducer DefaultState
See projector.go for more details on these functions.
import (
gms "github.com/blackhatbrigade/gomessagestore"
"github.com/blackhatbrigade/gomessagestore/uuid"
"context"
)
// Create a new messageStore instnace
messageStore := gms.NewMessageStore(postgresDB)
projector, err := messageStore.CreateProjector(
gms.DefaultState(someStruct{}),
gms.WithReducer(reducer1),
gms.WithReducer(reducer2),
)
projectors are typically passed into handlers. Here is a good example of an aggregator handler that ingests a projector as one of its parameters:
func genericHandler(ctx context.Context, repo ReadModelDatabase, projector gms.Projector, msg gms.Message, expectedType string) error {
event, ok := msg.(*gms.Event)
if !ok || event.Type() != expectedType {
return ErrInvalidEventTypeInHandler
}
projection, err := projector.Run(ctx, event.StreamCategory, event.EntityID)
if err != nil {
return err
}
entity, ok := projection.(EntityDetail)
if !ok {
return ErrInvalidTypeFromProjection
}
// the projector already set everything up for us, so just store it
error := ReadModelDatabase.store(entity)
if error != nil {
return databaseWriteError
}
return nil
}
A reducer should take in a message and the previous state, and update the previous state based on the information contained in the message to derive the current state.
import ( gms "github.com/blackhatbrigade/gomessagestore" )
func AddMessageReduce(msg gms.Message, previousState interface{}) interface{} {
tempState := previousState
tempState.value += previousState.value + msg.data.addValue
return tempState
}
GO MESSAGE STORE includes a built in package for generating UUID's that you can use for message IDs.
import ( uuid "github.com/blackhatbrigade/gomessagestore/uuid" )
// returns a random V4 UUID
uuid := uuid.NewRandom()