Skip to content

Commit

Permalink
NATS practical content piece and Kafka manual minor corrections (#7764)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 955b12fc5445c7911881b4f5589c140243567a2c
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Dec 6, 2024
1 parent b9f3f77 commit c0400b6
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ t = pw.io.kafka.read(

The way the input connector behaves depends on the format of the input data.
- `raw`: for raw data, there is only one column `data` in which all the entry is dumped.
- `csv` and `json`: the data is formatted according to the pattern. In this case, `value_columns` is expected.
- `csv` and `json`: the data is formatted according to the pattern. In this case, `schema` is expected.

⚠️ For the `csv` format: the first message should start with a header containing the column names, in the correct order, and separated by commas.
The connector will not properly work without this message, however, it must be sent only once: if sent twice, the second message will be treated like a normal row.
Expand All @@ -118,7 +118,7 @@ pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
## Complete example

Let's go back to our example on how to compute a sum over the values of the columns' `value` received on a Kafka topic `connector_example` in a CSV format.
The final version of our project contains two files: `realtime_sum.py` which processes the stream using Pathway and `generating_stream.sh` which generates the streams.
The final version of our project contains two files: `realtime_sum.py` which processes the stream using Pathway and `generate_stream.py` which generates the streams.

Here is `realtime_sum.py`:

Expand Down Expand Up @@ -163,9 +163,9 @@ Once `pw.run()` is called, the computation will be run forever until it gets kil
If you need some reminders on Pathway operations, don't hesitate to take a look at our [First-steps guide](/developers/user-guide/data-transformation/table-operations/).


You can use the KafkaProducer API provided by Kafka to send messages to Kafka using Python in a `generating_kafka_stream.py` script:
You can use the KafkaProducer API provided by Kafka to send messages to Kafka using Python in a `generate_stream.py` script:

```python [generating_kafka_stream.py]
```python [generate_stream.py]
from kafka import KafkaProducer
import time

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
---
title: 'NATS connectors'
description: 'Tutorial on how to use NATS connectors'
date: '2024-12-06'
tags: ['tutorial', 'connectors']
deployment_tag: ["docker"]
keywords: ['connector', 'NATS', 'Docker', 'input', 'output', 'streaming']
---

# Using NATS connectors
Pathway provides a [`pw.io.nats`](/developers/api-docs/pathway-io/nats) module with connectors to read and send messages from a NATS instance.

In Pathway, you can read and send messages to a NATS subject using [`pw.io.nats.read`](/developers/api-docs/pathway-io/nats#pathway.io.nats.read) and [`pw.io.nats.write`](/developers/api-docs/pathway-io/nats#pathway.io.nats.write).
NATS connectors only work in the [streaming mode](/developers/user-guide/introduction/streaming-and-static-modes/).

## Short version
Consider a simple scenario: messages are sent to a NATS subject `connector_example`, each message containing a single column `value` in a JSON format, and you want to compute the sum of these values and send the resulting output stream to the same NATS instance on a `sum` subject.
You can do it as follows in Pathway:

```python [realtime_sum.py]
import pathway as pw


# Define a schema for the table.
# It sets all the columns and their types.
class InputSchema(pw.Schema):
value: int


# Create connection string for the local NATS instance.
connection_string = "nats://localhost:4222/"

# Use the connection string below if authorization is needed:
# connection_string = "nats://username:password@localhost:4222/"

# Use Pathway NATS connector to read data.
t = pw.io.nats.read(
connection_string,
topic="connector_example",
schema=InputSchema,
format="json",
)

# Compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# Use the NATS connector to send the resulting output stream containing the sum.
pw.io.nats.write(t, connection_string, topic="sum", format="json")

# Launch the computation.
pw.run()
```

## Running NATS locally

If you run this tutorial locally, you'll need to set up a NATS instance on your computer. It can be done in two ways.

The first way is to download the `nats-server` binary from the [Releases page](https://github.com/nats-io/nats-server/releases) and start it. By default, it runs on port `4222` at `localhost`.

The second way is to run a Docker image provided by NATS developers and expose port `4222` so that it is accessible by other programs on your machine:
1. First, you need to pull the image from the repository. It can be done with `docker pull nats`.
2. Then, you need to run this image in the background and expose the required port: `docker run -d --name nats -p 4222:4222 nats`. Now you have NATS running in Docker on your machine.

The second option would be useful in case you need to create a system of several containers running together in `docker-compose`.

⚠️ Note that NATS was built to achieve high performance, and due to that, it doesn't store messages in any kind of buffer. Therefore, your program will receive only the messages that were sent *after* it has started.

## Input connector

**Data stream**:
Consider a stream in the form of NATS messages received on given subjects.
An update is a set of messages: the update is triggered by a commit.
Commits ensure the atomicity of each update and are generated periodically.

**Usage**:
the NATS input connector [`pw.io.nats.read`](/developers/api-docs/pathway-io/nats#pathway.io.nats.read) takes several arguments:
- `uri`: the connection string used to connect to NATS; It must follow the NATS connection string [format](https://docs.nats.io/using-nats/developer/connecting).
- `topic`: the subject which is listened to.
- `format`: format of messages among `plaintext`, `raw`, and `json`.
- `schema`: if the format is `json`, the table schema must be defined. This schema specifies the column names, their data types, and the primary keys. For the `raw` format, the payload is read as raw bytes and inserted directly into the table. If the `plaintext` format is selected, the payload is decoded from UTF-8 and stored as plain text.

```python
class InputSchema(pw.Schema):
value: int


t = pw.io.nats.read(
connection_string,
topic="connector_example",
schema=InputSchema,
format="json",
)
```

The way the input connector behaves depends on the format of the input data.
- `raw` or `plaintext`: for raw or plaintext data, there is only one column `data` in which all the entry is dumped.
- `json`: the data is parsed from JSON. In this case, `schema` field is required.

## Output connector

The output connector [`pw.io.nats.write`](/developers/api-docs/pathway-io/nats#pathway.io.nats.write) sends the updates made to a table `t` to a given NATS instance and *on a single NATS subject*.
The connector can produce table updates following the JSON format. There are also `raw` and `plaintext` formats that are supported for single-column tables or if you explicitly specify the name of the column that needs to be provided.


**Usage**:
The output connector takes the following arguments:
- `table`: the Pathway table to send to NATS.
- `uri`: the connection string used to connect to NATS; It must follow the NATS connection string [format](https://docs.nats.io/using-nats/developer/connecting).
- `topic`: the subject on which the messages are sent.
- `format`: `json`, `raw` or `plaintext`.

```python
pw.io.nats.write(t, connection_string, topic="sum", format="json")
```

## Complete example

Let's go back to our example on how to compute a sum over the values of the columns' `value` received on a NATS subject `connector_example` in a JSON format.
The final version of the project contains two files: `realtime_sum.py` which processes the stream using Pathway and `generate_stream.py` which generates the streams.

Here is `realtime_sum.py`:

```python [realtime_sum.py]
import pathway as pw


# Define a schema for the table.
# It sets all the columns and their types.
class InputSchema(pw.Schema):
value: int


# Create connection string for the local NATS instance.
connection_string = "nats://localhost:4222/"

# Use the connection string below if authorization is needed:
# connection_string = "nats://username:password@localhost:4222/"

# Use Pathway NATS connector to read data.
t = pw.io.nats.read(
connection_string,
topic="connector_example",
schema=InputSchema,
format="json",
)

# Compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# Use the NATS connector to send the resulting output stream containing the sum.
pw.io.nats.write(t, connection_string, topic="sum", format="json")

# Launch the computation.
pw.run()
```

Don't forget the `pw.run()` otherwise no computation will be done!
Once `pw.run()` is called, the computation will be run forever until it gets killed.
If you need some reminders on Pathway operations, don't hesitate to take a look at our [First-steps guide](/developers/user-guide/data-transformation/table-operations/).

You can use the `nats-py` library to send messages to NATS using Python in `generate_stream.py` script:

```python [generate_stream.py]
import asyncio
import json
from nats.aio.client import Client as NATS


async def send_messages():
nc = NATS()

await nc.connect("nats://localhost:4222")
try:
topic = "connector_example"
for i in range(10):
message = {"value": i}
await nc.publish(topic, json.dumps(message).encode("utf-8"))

finally:
await nc.close()


if __name__ == "__main__":
asyncio.run(send_messages())
```

Please note that since the NATS server does not store messages in a buffer, the subject listener must connect first to receive messages. Therefore, you need to run `realtime_sum.py` before sending any messages.

0 comments on commit c0400b6

Please sign in to comment.