Skip to content

Commit

Permalink
#167 Added table support (#172)
Browse files Browse the repository at this point in the history
* #167 Added table support
* #167 Return error if row contains field with unsupported type
  • Loading branch information
unflag authored Oct 12, 2023
1 parent fa574fd commit e0fd170
Show file tree
Hide file tree
Showing 19 changed files with 1,589 additions and 528 deletions.
51 changes: 46 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,60 @@ Query Editor unlocks all possibilities of CQL including Used-Defined Functions,
Example (using the sample table from the Query Configurator case):

```
SELECT sensor_id, CAST(temperature as double), registered_at FROM test.test WHERE sensor_id IN (99051fe9-6a9c-46c2-b949-38ef78858dd1, 99051fe9-6a9c-46c2-b949-38ef78858dd0) AND registered_at > $__timeFrom and registered_at < $__timeTo
SELECT sensor_id, temperature, registered_at, location FROM test.test WHERE sensor_id IN (99051fe9-6a9c-46c2-b949-38ef78858dd1, 99051fe9-6a9c-46c2-b949-38ef78858dd0) AND registered_at > $__timeFrom and registered_at < $__timeTo
```

1. Follow the order of the SELECT expressions, it's important!
1. Order of fields in the SELECT expression doesn't matter except `ID` field. This field used to distinguish different time series, so it is important to keep it on the first position.
* **Identifier** - the first property in the SELECT expression must be the ID, something that uniquely identifies the data (e.g. `sensor_id`)
* **Value** - The second property must be the value what you are going to show
* **Timestamp** - The third value must be timestamp of the value.
All other properties will be ignored
* **Value** - There should be at least one numeric value among returned fields, if query result will be used to draw graph.
* **Timestamp** - There should be one timestamp value, if query result will be used to draw graph.
* There could be any number of additional fields, however be cautious when using multiple numeric fields as they are interpreted as values by grafana and therefore are drawn on TimeSeries graph.
* Any field returned by query is available to use in `Alias` template, e.g. `{{ location }}`. Datasource interpolates such strings and updates graph legend.
* Datasource will try to keep all the fields, however it is not always possible since cassandra and grafana use different sets of supported types. Unsupported fields will be removed from response.

2. To filter data by time, use `$__timeFrom` and `$__timeTo` placeholders as in the example. The datasource will replace them with time values from the panel. **Notice** It's important to add the placeholders otherwise query will try to fetch data for the whole period of time. Don't try to specify the timeframe on your own, just put the placeholders. It's grafana's job to specify time limits.

![103153625-1fd85280-4792-11eb-9c00-085297802117](https://user-images.githubusercontent.com/1742301/148654522-8e50617d-0ba9-4c5a-a3f0-7badec92e31f.png)

#### Table Mode
In addition to TimeSeries mode datasource supports Table mode to draw tables using Cassandra query results. Use `Merge`, `Sort by`, `Organize fields` and other transformations to shape the table in any desirable way.
There are two ways to plot not a whole timeseries but only last(most rescent) values.
1. Inefficient way

In case if table created with default ascending ordering the most recent value is always stored in the end of partition. To retrieve it `ORDER BY` and `LIMIT` clauses must be used in query:
```
SELECT sensor_id, temperature, registered_at, location
FROM test.test
WHERE sensor_id = 99051fe9-6a9c-46c2-b949-38ef78858dd0
AND registered_at > $__timeFrom and registered_at < $__timeTo
ORDER BY registered_at
LIMIT 1
```
Note that `WHERE IN ()` clause could not be used with `ORDER BY`, so query must be duplicated for additional `sensor_id`.

2. Efficient way

To query the most recent values efficiently ordering must be specified during the table creation:
```
CREATE TABLE IF NOT EXISTS temperature (
sensor_id uuid,
registered_at timestamp,
temperature int,
location text,
PRIMARY KEY ((sensor_id), registered_at)
) WITH CLUSTERING ORDER BY (registered_at DESC);
```
After that the most recent value will always be stored in the beginning of partition and could be queried with just `LIMIT` clause:
```
SELECT sensor_id, temperature, registered_at, room_name
FROM test.test
WHERE sensor_id IN (99051fe9-6a9c-46c2-b949-38ef78858dd0, 99051fe9-6a9c-46c2-b949-38ef78858dd0)
AND registered_at > $__timeFrom and registered_at < $__timeTo
ORDER BY registered_at
PER PARTITION LIMIT 1
```
Note that `PER PARTITION LIMIT 1` used instead of `LIMIT 1` to query one row for each partition and not just one row total.

## Development

[Developer documentation](https://github.com/HadesArchitect/GrafanaCassandraDatasource/wiki/Developer-Guide)
36 changes: 0 additions & 36 deletions backend/cassandra/dto.go

This file was deleted.

40 changes: 40 additions & 0 deletions backend/cassandra/row.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cassandra

import (
"fmt"
"net"
"time"

"github.com/gocql/gocql"
)

type Row struct {
Columns []string
Fields map[string]interface{}
}

// normalize checks the type of returned field and in case if
// it is not supported by grafana tries to convert it to a supported type.
// If some field has type that cannot be converted then error is returned.
// Type mappings are based on these:
// Cassandra gocql types: https://github.com/gocql/gocql/blob/master/marshal.go#L164
// Grafana field types: https://github.com/grafana/grafana-plugin-sdk-go/blob/main/data/field.go#L39
func (r *Row) normalize() error {
for _, colName := range r.Columns {
switch v := r.Fields[colName].(type) {
case int8, int16, int32, int64, float32, float64, string, bool, time.Time:
case int:
r.Fields[colName] = int64(v)
case []byte:
r.Fields[colName] = string(v)
case net.IP:
r.Fields[colName] = v.String()
case gocql.UUID:
r.Fields[colName] = v.String()
default:
return fmt.Errorf("field %s has unsupported type %T", colName, v)
}
}

return nil
}
111 changes: 111 additions & 0 deletions backend/cassandra/row_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package cassandra

import (
"fmt"
"net"
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
)

func TestRow_normalize(t *testing.T) {
testCases := []struct {
name string
input *Row
want *Row
wantErr error
}{
{
name: "empty",
input: &Row{},
want: &Row{},
wantErr: nil,
},
{
name: "normal row",
input: &Row{
Columns: []string{"id", "time", "value"},
Fields: map[string]interface{}{"id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
want: &Row{
Columns: []string{"id", "time", "value"},
Fields: map[string]interface{}{"id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
wantErr: nil,
},
{
name: "normal row with nil in the beginning",
input: &Row{
Columns: []string{"field1", "id", "time", "value"},
Fields: map[string]interface{}{"field1": nil, "id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
want: &Row{
Columns: []string{"field1", "id", "time", "value"},
Fields: map[string]interface{}{"field1": nil, "id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
wantErr: fmt.Errorf("field %s has unsupported type %T", "field1", nil),
},
{
name: "normal row with nil in the end",
input: &Row{
Columns: []string{"id", "time", "value", "field1"},
Fields: map[string]interface{}{"id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1, "field1": nil},
},
want: &Row{
Columns: []string{"id", "time", "value", "field1"},
Fields: map[string]interface{}{"id": "id", "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1, "field1": nil},
},
wantErr: fmt.Errorf("field %s has unsupported type %T", "field1", nil),
},
{
name: "normal row with unsupported",
input: &Row{
Columns: []string{"id", "field1", "time", "value"},
Fields: map[string]interface{}{"id": "id", "field1": struct{}{}, "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
want: &Row{
Columns: []string{"id", "field1", "time", "value"},
Fields: map[string]interface{}{"id": "id", "field1": struct{}{}, "time": time.UnixMilli(1257894000000).UTC(), "value": 0.1},
},
wantErr: fmt.Errorf("field %s has unsupported type %T", "field1", struct{}{}),
},
{
name: "normal row with multiple unsupported fields",
input: &Row{
Columns: []string{"field1", "id", "field2", "time", "field3", "value"},
Fields: map[string]interface{}{"field1": struct{}{}, "id": "id", "field2": struct{}{}, "time": time.UnixMilli(1257894000000).UTC(), "field3": struct{}{}, "value": 0.1},
},
want: &Row{
Columns: []string{"field1", "id", "field2", "time", "field3", "value"},
Fields: map[string]interface{}{"field1": struct{}{}, "id": "id", "field2": struct{}{}, "time": time.UnixMilli(1257894000000).UTC(), "field3": struct{}{}, "value": 0.1},
},
wantErr: fmt.Errorf("field %s has unsupported type %T", "field1", struct{}{}),
},
{
name: "normal row with conversion",
input: &Row{
Columns: []string{"field1", "id", "field2", "time", "field3", "value"},
Fields: map[string]interface{}{"field1": gocql.UUID{}, "id": "id", "field2": net.ParseIP("127.0.0.1"), "time": time.UnixMilli(1257894000000).UTC(), "field3": []byte("some string"), "value": 0.1},
},
want: &Row{
Columns: []string{"field1", "id", "field2", "time", "field3", "value"},
Fields: map[string]interface{}{"field1": "00000000-0000-0000-0000-000000000000", "id": "id", "field2": "127.0.0.1", "time": time.UnixMilli(1257894000000).UTC(), "field3": "some string", "value": 0.1},
},
wantErr: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.normalize()
if tc.wantErr == nil {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.wantErr.Error())
}
assert.Equal(t, tc.want, tc.input)
})
}
}
Loading

0 comments on commit e0fd170

Please sign in to comment.