diff --git a/examples/messaging/server-streaming/go/main.go b/examples/messaging/server-streaming/go/main.go deleted file mode 100644 index e3e2d99f..00000000 --- a/examples/messaging/server-streaming/go/main.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - "os" - "strconv" - "time" - - "github.com/nats-io/nats.go" -) - -func main() { - // Use the env varibale if running in the container, otherwise use the default. - url := os.Getenv("NATS_URL") - if url == "" { - url = nats.DefaultURL - } - - // Create an unauthenticated connection to NATS. - nc, _ := nats.Connect(url) - defer nc.Drain() - - // Emulate a Random Number Generator service. - nc.Subscribe("rng", func(msg *nats.Msg) { - // Assume the message payload is a valid integer. - n, _ := strconv.ParseInt(string(msg.Data), 10, 64) - - // Get the reply subject to stream messages to. - reply := msg.Header.Get("x-stream-subject") - - // Respond to the initial request with a nil message indicating everything - // is OK and the messages will be streamed. If there was a validation issue - // or other problem, an error response could be returned to tell the client - // to close the subscription. - msg.Respond(nil) - - // Stream the responds to the client. - for i := 0; i < int(n); i++ { - r := rand.Intn(100) - nc.Publish(reply, []byte(fmt.Sprintf("%d", r))) - } - - // Publish nil data to indicate end-of-stream. - nc.Publish(reply, nil) - }) - - // Generate a new random inbox subject for the server stream. - streamSubject := nc.NewInbox() - - // Setup a subscription on that unique subject to buffer messages. - sub, _ := nc.SubscribeSync(streamSubject) - - // Construct the request message, including the stream header and the - // number of random numbers to generate. - msg := nats.NewMsg("rng") - msg.Data = []byte("10") - msg.Header.Add("x-stream-subject", streamSubject) - - // Assume all is ok for the example.. - nc.RequestMsg(msg, time.Second) - - for { - msg, _ := sub.NextMsg(time.Second) - // Indicates the end of the stream. - if len(msg.Data) == 0 { - break - } - - // Print the random number. - fmt.Println(string(msg.Data)) - } -} diff --git a/examples/messaging/server-streaming/meta.yaml b/examples/messaging/server-streaming/meta.yaml deleted file mode 100644 index 69a3cfca..00000000 --- a/examples/messaging/server-streaming/meta.yaml +++ /dev/null @@ -1,5 +0,0 @@ -title: Server-side streaming -description: |- - This messaging pattern is initiated by a client request which will result in one or more messages replied from the server. - - This is achieved by the client generating a one-off, random subject and subscribing to it. The subject is include in the initial request that the service can publish messages back to. diff --git a/examples/messaging/service-streaming/go/main.go b/examples/messaging/service-streaming/go/main.go new file mode 100644 index 00000000..e20031a4 --- /dev/null +++ b/examples/messaging/service-streaming/go/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "time" + + "github.com/nats-io/nats.go" +) + +func main() { + // Use the env varibale if running in the container, otherwise use the default. + url := os.Getenv("NATS_URL") + if url == "" { + url = nats.DefaultURL + } + + // Create an unauthenticated connection to NATS. + nc, _ := nats.Connect(url) + defer nc.Drain() + + // Setup a simple random number generator (RNG) service that + // streams numbers. Using a queue group ensures that only one + // of the members will receive the request to process. + nc.QueueSubscribe("rng", "rng", func(msg *nats.Msg) { + // Do request validation, etc. If there are any issues, a response + // can be sent with any error prior to the stream starting. + n, _ := strconv.ParseInt(string(msg.Data), 10, 64) + + // Extract the unique subject to stream messages on. + subject := msg.Header.Get("x-stream-subject") + + // Respond to the initial request with any errors. Otherwise an + // empty reply indicates the stream will start. + msg.Respond(nil) + + // Stream the numbers to the client. A publish is used here, + // however, if ack'ing is desired, a request could be used + // per message. + for i := 0; i < int(n); i++ { + r := rand.Intn(100) + nc.Publish(subject, []byte(fmt.Sprintf("%d", r))) + } + + // Publish empty data to indicate end-of-stream. + nc.Publish(subject, nil) + }) + + // Generate a unique inbox subject for the stream of messages + // and subscribe to it. + inbox := nc.NewInbox() + sub, _ := nc.SubscribeSync(inbox) + + // Prepare the message to initiate the interaction. The inbox + // subject is included in the header for the service to extract + // and publish to. + msg := nats.NewMsg("rng") + msg.Header.Set("x-stream-subject", inbox) + msg.Data = []byte("10") + + // Send the request to initiate the interaction. + nc.RequestMsg(msg, time.Second) + + // Loop to receive all messages over the stream. + for { + msg, err := sub.NextMsg(time.Second) + // Handle error. + if err != nil { + sub.Unsubscribe() + // handle error + break + } + + // Indicates the end of the stream. + if len(msg.Data) == 0 { + sub.Unsubscribe() + break + } + + // Print the random number. + fmt.Println(string(msg.Data)) + } +} diff --git a/examples/messaging/service-streaming/meta.yaml b/examples/messaging/service-streaming/meta.yaml new file mode 100644 index 00000000..074bb028 --- /dev/null +++ b/examples/messaging/service-streaming/meta.yaml @@ -0,0 +1,31 @@ +title: Service streaming pattern +description: |- + This messaging pattern builds upon the built-in [request-reply][req-rep] API. + + Prior to the client sending the request, it must generate a unique subject + to be the dedicated *inbox* for receiving the stream of messages from the + service. + + The simpliest way to achieve this is to use the client connection's + `NewInbox()` method which also ensures any connection-level inbox prefix + is already prepended. + + This unique subject is then added as a header, such as `x-stream-subject`. + Prior to sending the request, the subscription on that subject must be setup + by the client so it can receive messages as soon as the client sends the + request. + + When the service receives the request, it can do any request validation, etc. + and perform the standard reply indicating the streaming will be begin. + Now the service can freely publish messages (or send requests if acks are desired) + on the subject passed within the header. + + To indicate the end-of-stream, the last message provides an empty message body + with an optional set of headers indicating any status, such as an error + had occurred mid-stream. + + For those familiar with gRPC, this is analogous to the + [server streaming RPC][grpc]. + + [req-rep]: /examples/messaging/request-reply/go + [grpc]: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc