Cursors allow Pinot clients to consume query results in smaller chunks. With this approach,
- clients use less memory because client-side caching of results is not required.
- application logic is simpler. For example an app that paginates through results in a table or a graph can get the required slice of results when a page refreshes.
Features of the cursor implementation in Apache Pinot are:
- A query is run once and its results are stored in a temporary location. The results are cleaned up after a configurable period of time.
- The first page of results is returned in the response.
- A client can iterate through the rest of the result set by using the responseStore API.
- The client can seek forward and backward as well as change the number of rows in the repsonse.
- Cursors can be used with Single-Stage and Multi-Stage Query Engines.
System Diagram of Cursor Components
A ResponseStore stores the results of a query. The ResponseStore is created and managed by the broker which executes the query.
A client should access a response store from the same broker where it submitted a query.
Clients can determine the broker host & port from the client response. An error is thrown if clients try to access ResponseStores from another broker.
A ResponseStore is identified by the requestId of the query.
Any user that has READ permissions on all tables in the query can read from the response store.
New implementations of ResponseStore can be added by implementing the ResponseStore SPI. A specific implementation of the ResponseStore can be chosen at startup by specifying the config parameter pinot.broker.cursor.response.store.type.
Note that only ONE implementation of the ResponseStore can be used in a cluster.
FsResponseStore is the default implementation of the ResponseStore. Internally it uses PinotFileSystem. FsResponseStore can be configured to use any filesystem supported by PinotFileSystem such as HDFS, Amazon S3, Google Cloud Storage or Azure DataLake.
By default, the broker's local storage is used to store responses.
# Example configuration for file using local storage
pinot.broker.cursor.result.store.type=file
pinot.broker.cursor.result.store.file.temp.dir=/home/pinot/broker/data/cursors/temp
pinot.broker.cursor.result.store.file.data.dir=file:///home/pinot/data/cursors/data
#Example configuration for file using S3
pinot.broker.cursor.result.store.type=file
pinot.broker.storage.factory.s3.region=us-west-2
pinot.broker.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.broker.cursor.result.store.file.temp.dir=/home/pinot/broker/data/cursors/temp
pinot.broker.cursor.result.store.file.data.dir.data.dir=s3://bucket/dir/query-results/
ResponseStore using Blob Store like AWS S3
This is a periodic job that runs on the controller. A ResponseStore has an expiration time. The ResponseStoreCleaner sends a DELETE request to brokers to delete expired ResponseStores.
A new API parameter has been added to trigger pagination.
The API accepts the following new optional query parameters:
- getCursor(boolean):
- numRows (int): The number of rows to return in the first page.
curl --request POST http://localhost:8000/query/sql?getCursor=true&numRows=1 \
--data '{"sql":"SELECT * FROM nation limit 100"}' | jq
Response:
{
"resultTable": {
"dataSchema": {
"columnNames": [
"n_comment",
"n_name",
"n_nationkey",
"n_regionkey"
],
"columnDataTypes": [
"STRING",
"STRING",
"INT",
"INT"
]
},
"rows": [
[
" haggle. carefully final deposits detect slyly agai",
"ALGERIA",
0,
0
]
]
},
"numRowsResultSet": 25,
"requestId": "236490978000000006",
"offset": 0,
"numRows": 1,
"cursorResultWriteTimeMs": 4,
"submissionTimeMs": 1734928302801,
"expirationTimeMs": 1734931902801,
"brokerHost": "127.0.0.1",
"brokerPort": 8000,
"bytesWritten": 2489,
"cursorFetchTimeMs": 0,
}
The output above shows response fields that are specific to cursor responses. Other than numRowsResultSet and requestId, fields common with BrokerResponse are not shown for brevity.
Field | Description |
---|---|
numRowsResultSet | Total numbers of rows in the result set. Same as in default BrokerResponse |
requestId | The unique ID for the query. It has to be used in subsequent calls to cursor API. Same as in default BrokerResponse |
offset | The offset of the first row in the resultTable. |
numRows | The number of rows in the resultTable. |
cursorResultWriteTimeMs | Time in milliseconds to write the response to ResponseStore. It is applicable only for the query submission API. |
submissionTimeMs | Unix timestamp in milliseconds when the query was submitted. |
expirationTimeMs | Expiration time of the ResponseStore in unix timestamp in milliseconds. |
brokerHost | Hostname or IP address of the broker that manages the ResponseStore. All subsequent cursor API calls should be directed to this broker. |
brokerPort | The port of the broker that manages the ResponseStore |
bytesWritten | The number of bytes written to ResponseStore when storing the result set. |
cursorFetchTimeMs | Time in milliseconds to fetch the cursor from ResponseStore. It is applicable for cursor fetch API. |
This is broker API that can be used to iterate over the result set of a query in a ResponseStore.
The API accepts the following query parameters:
- offset (int) (required): Offset of the first row to be fetched. Offset starts from 0 for the first row in the resultset.
- numRows (int) (optional): The number of rows in the page. If not specified, the value specified by the config parameter "pinot.broker.cursor.fetch.rows" is used.
curl -X GET http://localhost:8000/responseStore/236490978000000006/results\?offset\=1\&numRows\=1 | jq
{
"resultTable": {
"dataSchema": {
"columnNames": [
"n_comment",
"n_name",
"n_nationkey",
"n_regionkey"
],
"columnDataTypes": [
"STRING",
"STRING",
"INT",
"INT"
]
},
"rows": [
[
"al foxes promise slyly according to the regular accounts. bold requests alon",
"ARGENTINA",
1,
1
]
]
},
"numRowsResultSet": 25,
"requestId": "236490978000000006",
"offset": 1,
"numRows": 1,
"cursorResultWriteTimeMs": 0,
"submissionTimeMs": 1734928302801,
"expirationTimeMs": 1734931902801,
"brokerHost": "127.0.0.1",
"brokerPort": 8000,
"bytesWritten": 2489,
"cursorFetchTimeMs": 1,
}
Returns the BrokerResponse metadata of the query.
The API accepts the following URL parameters:
- requestId (required)
Returns a list of ResponseStores. Only the response metadata is returned.
curl -X GET http://localhost:8000/responseStore | jq
[
{
"requestId": "236490978000000005",
...
},
{
"requestId": "236490978000000006",
...
}
]
Delete the results of a query.
The API accepts the following URL parameters:
- requestId (required)
Configuration parameters with pinot.broker prefix are Broker configuration parameters.
Configuration parameters with controller prefix are Controller configuration parameters.
Configuration | Default | Description |
---|---|---|
pinot.broker.cursor.response.store.type | file | Specifies the ResponseStore type to instantiate. |
pinot.broker.cursor.response.store.file.data.dir | {java.io.tmpdir}/broker/responseStore/data | Directory where the responses will be stored. |
pinot.broker.cursor.response.store.file.temp.dir | {java.io.tmpdir}/broker/responseStore/temp | Directory where temporary files will be stored. |
pinot.broker.cursor.response.store.expiration | 1h | Time To Live for a response store. |
pinot.broker.cursor.fetch.rows | 10000 | The default number of rows in a cursor response. |
controller.cluster.response.store.cleaner.frequencyPeriod | 1h | The frequency of ResponseStoreCleaner |
controller.cluster.response.store.cleaner.initialDelay | random delay between 0-300 seconds | The initial delay before the first run of the periodic task. |