From 2036bc75463c9872e2a6a37d5a726f0224f3aab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Sun, 18 Feb 2024 00:30:01 +0100 Subject: [PATCH 1/2] WIP --- pkg/quickwit/timestamp_infos.go | 19 ++++++++++++------- src/datasource.ts | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index ab1b018..fda9ec5 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -8,7 +8,7 @@ import ( "net/http" ) -type QuickwitMapping struct { +type QuickwitIndexMetadata struct { IndexConfig struct { DocMapping struct { TimestampField string `json:"timestamp_field"` @@ -35,7 +35,7 @@ func NewErrorCreationPayload(statusCode int, message string) error { } func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) { - var payload QuickwitMapping + var payload []QuickwitIndexMetadata err := json.Unmarshal(body, &payload) if err != nil { @@ -44,15 +44,20 @@ func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) { return "", NewErrorCreationPayload(statusCode, errMsg) } - timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + var timestampFieldNames []string + for _, indexMetadata := range payload { + timestampFieldNames = append(timestampFieldNames, indexMetadata.IndexConfig.DocMapping.TimestampField) + } + + // timestampFieldName := payload.IndexConfig.DocMapping.TimestampField - qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - return timestampFieldName, nil + // qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) + // return timestampFieldName, nil } func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) { - mappingEndpointUrl := qwUrl + "/indexes/" + index - qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) + mappingEndpointUrl := qwUrl + "/indexes?index_id_pattern=" + index + qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) diff --git a/src/datasource.ts b/src/datasource.ts index bdd6b5f..2139821 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -772,7 +772,7 @@ export function enhanceDataFrameWithDataLinks(dataFrame: DataFrame, dataLinks: D config: {}, values: displayedMessages, } - console.log('newField'); + console.log(dataFrame); dataFrame.fields = [newField, ...dataFrame.fields]; } From 09030cca3c82650d0723b08548447cc514ee6e75 Mon Sep 17 00:00:00 2001 From: fmassot Date: Sun, 18 Feb 2024 13:25:33 +0100 Subject: [PATCH 2/2] Support index patterns. --- pkg/quickwit/quickwit.go | 2 +- pkg/quickwit/timestamp_infos.go | 90 ++++++++++++++--- pkg/quickwit/timestamp_infos_test.go | 139 +++++++++------------------ 3 files changed, 118 insertions(+), 113 deletions(-) diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index b43e8e6..990044c 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -91,7 +91,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc } if !toOk { - timeField, err = GetTimestampFieldInfos(index, settings.URL, httpCli) + timeField, err = GetTimestampField(index, settings.URL, httpCli) if nil != err { return nil, err } diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index fda9ec5..ccbf978 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -6,13 +6,13 @@ import ( "fmt" "io" "net/http" + "strings" ) type QuickwitIndexMetadata struct { IndexConfig struct { DocMapping struct { - TimestampField string `json:"timestamp_field"` - FieldMappings []FieldMappings `json:"field_mappings"` + TimestampField string `json:"timestamp_field"` } `json:"doc_mapping"` } `json:"index_config"` } @@ -34,29 +34,47 @@ func NewErrorCreationPayload(statusCode int, message string) error { return errors.New(string(json)) } -func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) { - var payload []QuickwitIndexMetadata - err := json.Unmarshal(body, &payload) +// TODO: refactor either by using a timestamp alias suppprted by quickwit +// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint +// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1 +func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) { + if strings.Contains(index, "*") || strings.Contains(index, ",") { + return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli) + } + return GetTimestampFieldFromIndex(index, qwickwitUrl, cli) +} +func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) { + mappingEndpointUrl := qwickwitUrl + "/indexes/" + index + qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) + r, err := cli.Get(mappingEndpointUrl) if err != nil { - errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(statusCode, errMsg) + return "", err } - var timestampFieldNames []string - for _, indexMetadata := range payload { - timestampFieldNames = append(timestampFieldNames, indexMetadata.IndexConfig.DocMapping.TimestampField) + statusCode := r.StatusCode + + if statusCode < 200 || statusCode >= 400 { + errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(statusCode, errMsg) } - // timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(statusCode, errMsg) + } - // qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - // return timestampFieldName, nil + return DecodeTimestampFieldFromIndexConfig(body) } -func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) { - mappingEndpointUrl := qwUrl + "/indexes?index_id_pattern=" + index +func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) { + mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_pattern=" + indexPattern qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { @@ -81,5 +99,45 @@ func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin return "", NewErrorCreationPayload(statusCode, errMsg) } - return DecodeTimestampFieldInfos(statusCode, body) + return DecodeTimestampFieldFromIndexConfigs(body) +} + +func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) { + var payload []QuickwitIndexMetadata + err := json.Unmarshal(body, &payload) + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(500, errMsg) + } + + var timestampFieldName string = "" + for _, indexMetadata := range payload { + if timestampFieldName == "" { + timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField + continue + } + + if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField { + errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(400, errMsg) + } + } + + qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) + return timestampFieldName, nil +} + +func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) { + var payload QuickwitIndexMetadata + err := json.Unmarshal(body, &payload) + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(500, errMsg) + } + timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) + return timestampFieldName, nil } diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go index d24ca31..632d966 100644 --- a/pkg/quickwit/timestamp_infos_test.go +++ b/pkg/quickwit/timestamp_infos_test.go @@ -13,142 +13,89 @@ func TestDecodeTimestampFieldInfos(t *testing.T) { query := []byte(` { "version": "0.6", - "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", "index_config": { "version": "0.6", - "index_id": "myindex", - "index_uri": "s3://quickwit-indexes/myindex", "doc_mapping": { - "field_mappings": [ - { - "name": "foo", - "type": "text", - "fast": false, - "fieldnorms": false, - "indexed": true, - "record": "basic", - "stored": true, - "tokenizer": "default" - }, - { - "name": "timestamp", - "type": "datetime", - "fast": true, - "fast_precision": "seconds", - "indexed": true, - "input_formats": [ - "rfc3339", - "unix_timestamp" - ], - "output_format": "rfc3339", - "stored": true - } - ], - "tag_fields": [], - "store_source": true, - "index_field_presence": false, "timestamp_field": "timestamp", "mode": "dynamic", - "dynamic_mapping": {}, - "partition_key": "foo", - "max_num_partitions": 1, "tokenizers": [] }, - "indexing_settings": {}, - "search_settings": { - "default_search_fields": [ - "foo" - ] - }, "retention": null }, - "checkpoint": {}, - "create_timestamp": 1701075471, "sources": [] } `) // When - timestampFieldName, err := DecodeTimestampFieldInfos(200, query) + timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query) // Then require.NoError(t, err) require.Equal(t, timestampFieldName, "timestamp") }) - t.Run("Test decode nested fields", func(t *testing.T) { + t.Run("Test decode from list of index config", func(t *testing.T) { // Given query := []byte(` + [ { "version": "0.6", - "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", "index_config": { - "version": "0.6", - "index_id": "myindex", - "index_uri": "s3://quickwit-indexes/myindex", "doc_mapping": { - "field_mappings": [ - { - "name": "foo", - "type": "text", - "fast": false, - "fieldnorms": false, - "indexed": true, - "record": "basic", - "stored": true, - "tokenizer": "default" - }, - { - "name": "sub", - "type": "object", - "field_mappings": [ - { - "fast": true, - "fast_precision": "seconds", - "indexed": true, - "input_formats": [ - "rfc3339", - "unix_timestamp" - ], - "name": "timestamp", - "output_format": "rfc3339", - "stored": true, - "type": "datetime" - } - ] - } - ], - "tag_fields": [], - "store_source": true, - "index_field_presence": false, - "timestamp_field": "sub.timestamp", - "mode": "dynamic", - "dynamic_mapping": {}, - "partition_key": "foo", - "max_num_partitions": 1, - "tokenizers": [] + "timestamp_field": "sub.timestamp" }, "indexing_settings": {}, - "search_settings": { - "default_search_fields": [ - "foo" - ] - }, "retention": null }, - "checkpoint": {}, - "create_timestamp": 1701075471, "sources": [] } + ] `) // When - timestampFieldName, err := DecodeTimestampFieldInfos(200, query) + timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query) // Then require.NoError(t, err) require.Equal(t, timestampFieldName, "sub.timestamp") }) + + t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) { + // Given + query := []byte(` + [ + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "sub.timestamp" + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + }, + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "sub.timestamp2" + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + } + ] + `) + + // When + _, err := DecodeTimestampFieldFromIndexConfigs(query) + + // Then + require.Error(t, err) + require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields") + }) }) }