Skip to content

Commit

Permalink
Update service streaming example
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Nov 28, 2022
1 parent be5b852 commit 3b729b4
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 78 deletions.
73 changes: 0 additions & 73 deletions examples/messaging/server-streaming/go/main.go

This file was deleted.

5 changes: 0 additions & 5 deletions examples/messaging/server-streaming/meta.yaml

This file was deleted.

85 changes: 85 additions & 0 deletions examples/messaging/service-streaming/go/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"fmt"
"math/rand"
"os"
"strconv"
"time"

"github.com/nats-io/nats.go"
)

func main() {
// Use the env varibale if running in the container, otherwise use the default.
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

// Create an unauthenticated connection to NATS.
nc, _ := nats.Connect(url)
defer nc.Drain()

// Setup a simple random number generator (RNG) service that
// streams numbers. Using a queue group ensures that only one
// of the members will receive the request to process.
nc.QueueSubscribe("rng", "rng", func(msg *nats.Msg) {
// Do request validation, etc. If there are any issues, a response
// can be sent with any error prior to the stream starting.
n, _ := strconv.ParseInt(string(msg.Data), 10, 64)

// Extract the unique subject to stream messages on.
subject := msg.Header.Get("x-stream-subject")

// Respond to the initial request with any errors. Otherwise an
// empty reply indicates the stream will start.
msg.Respond(nil)

// Stream the numbers to the client. A publish is used here,
// however, if ack'ing is desired, a request could be used
// per message.
for i := 0; i < int(n); i++ {
r := rand.Intn(100)
nc.Publish(subject, []byte(fmt.Sprintf("%d", r)))
}

// Publish empty data to indicate end-of-stream.
nc.Publish(subject, nil)
})

// Generate a unique inbox subject for the stream of messages
// and subscribe to it.
inbox := nc.NewInbox()
sub, _ := nc.SubscribeSync(inbox)

// Prepare the message to initiate the interaction. The inbox
// subject is included in the header for the service to extract
// and publish to.
msg := nats.NewMsg("rng")
msg.Header.Set("x-stream-subject", inbox)
msg.Data = []byte("10")

// Send the request to initiate the interaction.
nc.RequestMsg(msg, time.Second)

// Loop to receive all messages over the stream.
for {
msg, err := sub.NextMsg(time.Second)
// Handle error.
if err != nil {
sub.Unsubscribe()
// handle error
break
}

// Indicates the end of the stream.
if len(msg.Data) == 0 {
sub.Unsubscribe()
break
}

// Print the random number.
fmt.Println(string(msg.Data))
}
}
31 changes: 31 additions & 0 deletions examples/messaging/service-streaming/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
title: Service streaming pattern
description: |-
This messaging pattern builds upon the built-in [request-reply][req-rep] API.
Prior to the client sending the request, it must generate a unique subject
to be the dedicated *inbox* for receiving the stream of messages from the
service.
The simpliest way to achieve this is to use the client connection's
`NewInbox()` method which also ensures any connection-level inbox prefix
is already prepended.
This unique subject is then added as a header, such as `x-stream-subject`.
Prior to sending the request, the subscription on that subject must be setup
by the client so it can receive messages as soon as the client sends the
request.
When the service receives the request, it can do any request validation, etc.
and perform the standard reply indicating the streaming will be begin.
Now the service can freely publish messages (or send requests if acks are desired)
on the subject passed within the header.
To indicate the end-of-stream, the last message provides an empty message body
with an optional set of headers indicating any status, such as an error
had occurred mid-stream.
For those familiar with gRPC, this is analogous to the
[server streaming RPC][grpc].
[req-rep]: /examples/messaging/request-reply/go
[grpc]: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc

1 comment on commit 3b729b4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for nats-by-example ready!

✅ Preview
https://nats-by-example-2p2xhoplf-connecteverything.vercel.app

Built with commit 3b729b4.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.