diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 45047c4..4937372 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -26,6 +25,13 @@ type QuickwitDatasource struct { dsInfo es.DatasourceInfo } +type FieldMappings struct { + Name string `json:"name"` + Type string `json:"type"` + OutputFormat *string `json:"output_format,omitempty"` + FieldMappings []FieldMappings `json:"field_mappings,omitempty"` +} + // Creates a Quickwit datasource. func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { qwlog.Debug("Initializing new data source instance") @@ -50,19 +56,8 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc return nil, err } - timeField, ok := jsonData["timeField"].(string) - if !ok { - return nil, errors.New("timeField cannot be cast to string") - } - - if timeField == "" { - return nil, errors.New("a time field name is required") - } - - timeOutputFormat, ok := jsonData["timeOutputFormat"].(string) - if !ok { - return nil, errors.New("timeOutputFormat cannot be cast to string") - } + timeField, toOk := jsonData["timeField"].(string) + timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string) logLevelField, ok := jsonData["logLevelField"].(string) if !ok { @@ -96,6 +91,13 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc maxConcurrentShardRequests = 256 } + if !toOk || !tofOk { + timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli) + if nil != err { + return nil, err + } + } + configuredFields := es.ConfiguredFields{ TimeField: timeField, TimeOutputFormat: timeOutputFormat, diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go new file mode 100644 index 0000000..5a49bc5 --- /dev/null +++ b/pkg/quickwit/timestamp_infos.go @@ -0,0 +1,111 @@ +package quickwit + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" +) + +type QuickwitMapping struct { + IndexConfig struct { + DocMapping struct { + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` + } `json:"doc_mapping"` + } `json:"index_config"` +} + +type QuickwitCreationErrorPayload struct { + Message string `json:"message"` + StatusCode int `json:"status"` +} + +func NewErrorCreationPayload(statusCode int, message string) error { + var payload QuickwitCreationErrorPayload + payload.Message = message + payload.StatusCode = statusCode + json, err := json.Marshal(payload) + if nil != err { + return err + } + + return errors.New(string(json)) +} + +func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) *string { + if nil == fieldMappings { + return nil + } + + for _, field := range fieldMappings { + fieldName := field.Name + if nil != parentName { + fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) + } + + if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { + return field.OutputFormat + } else if field.Type == "object" && nil != field.FieldMappings { + format := FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) + if nil != format { + return format + } + } + } + + return nil +} + +func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, string, error) { + var payload QuickwitMapping + err := json.Unmarshal(body, &payload) + + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + timestampFieldFormat := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) + + if nil == timestampFieldFormat { + errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) + qwlog.Error(errMsg) + return timestampFieldName, "", NewErrorCreationPayload(statusCode, errMsg) + } + + qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, *timestampFieldFormat)) + return timestampFieldName, *timestampFieldFormat, nil +} + +func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { + mappingEndpointUrl := qwUrl + "/indexes/" + index + qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) + r, err := cli.Get(mappingEndpointUrl) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", err + } + + statusCode := r.StatusCode + + if statusCode < 200 || statusCode >= 400 { + errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + return DecodeTimestampFieldInfos(statusCode, body) +} diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go new file mode 100644 index 0000000..55f5916 --- /dev/null +++ b/pkg/quickwit/timestamp_infos_test.go @@ -0,0 +1,313 @@ +package quickwit + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDecodeTimestampFieldInfos(t *testing.T) { + t.Run("Test decode timestam field infos", func(t *testing.T) { + t.Run("Test decode simple fields", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "timestamp", + "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "output_format": "rfc3339", + "stored": true + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + timestampFieldName, timestampFieldFormat, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "timestamp") + require.Equal(t, timestampFieldFormat, "rfc3339") + }) + + t.Run("Test decode nested fields", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "datetime" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "sub.timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + timestampFieldName, timestampFieldFormat, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "sub.timestamp") + require.Equal(t, timestampFieldFormat, "rfc3339") + }) + + t.Run("The timestamp field is not at the expected path", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "datetime" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + _, _, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.Error(t, err) + }) + + t.Run("The timestamp field has not the right type", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "whatever" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "sub.timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + _, _, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.Error(t, err) + }) + }) +} + +func TestNewErrorCreationPayload(t *testing.T) { + t.Run("Test marshall creation payload error", func(t *testing.T) { + // When + err := NewErrorCreationPayload(400, "No valid format") + + // Then + require.Error(t, err) + require.ErrorContains(t, err, "\"message\":\"No valid format\"") + require.ErrorContains(t, err, "\"status\":400") + }) +} diff --git a/src/configuration/ConfigEditor.tsx b/src/configuration/ConfigEditor.tsx index 18e7dba..f756d3c 100644 --- a/src/configuration/ConfigEditor.tsx +++ b/src/configuration/ConfigEditor.tsx @@ -73,24 +73,6 @@ export const QuickwitDetails = ({ value, onChange }: DetailsProps) => { width={40} /> - - onChange({ ...value, jsonData: {...value.jsonData, timeField: event.currentTarget.value}})} - placeholder="timestamp" - width={40} - /> - - - onChange({ ...value, jsonData: {...value.jsonData, timeOutputFormat: event.currentTarget.value}})} - placeholder="unix_timestamp_millisecs" - width={40} - /> - { + let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); + let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field + let timestampField = fields.find((field) => field.json_path === timestampFieldName); + let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : '' + let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat } + return timestampFieldInfos + }), + catchError((err) => { + if (!err.data || !err.data.error) { + let err_source = extractJsonPayload(err.data.error) + if(!err_source) { + throw err + } + } + + // the error will be handle in the testDatasource function + return of({'field': '', 'format': ''}) + }) + ).subscribe(result => { + this.timeField = result.field; + this.timeOutputFormat = result.format; + this.queryBuilder = new ElasticQueryBuilder({ + timeField: this.timeField, + }); + }); + + this.logMessageField = settingsData.logMessageField || ''; + this.logLevelField = settingsData.logLevelField || ''; this.dataLinks = settingsData.dataLinks || []; this.languageProvider = new ElasticsearchLanguageProvider(this); } @@ -112,12 +140,7 @@ export class QuickwitDataSource message: 'Cannot save datasource, `index` is required', }; } - if (this.timeField === '' ) { - return { - status: 'error', - message: 'Cannot save datasource, `timeField` is required', - }; - } + return lastValueFrom( from(this.getResource('indexes/' + this.index)).pipe( mergeMap((indexMetadata) => { @@ -131,7 +154,14 @@ export class QuickwitDataSource return of({ status: 'success', message: `Index OK. Time field name OK` }); }), catchError((err) => { - if (err.status === 404) { + if (err.data && err.data.error) { + let err_source = extractJsonPayload(err.data.error) + if (err_source) { + err = err_source + } + } + + if (err.status && err.status === 404) { return of({ status: 'error', message: 'Index does not exists.' }); } else if (err.message) { return of({ status: 'error', message: err.message }); @@ -148,21 +178,19 @@ export class QuickwitDataSource if (this.timeField === '') { return `Time field must not be empty`; } - if (indexMetadata.index_config.doc_mapping.timestamp_field !== this.timeField) { - return `No timestamp field named '${this.timeField}' found`; - } + let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); let timestampField = fields.find((field) => field.json_path === this.timeField); + // Should never happen. if (timestampField === undefined) { return `No field named '${this.timeField}' found in the doc mapping. This should never happen.`; } - if (timestampField.field_mapping.output_format !== this.timeOutputFormat) { - return `Timestamp output format is declared as '${timestampField.field_mapping.output_format}' in the doc mapping, not '${this.timeOutputFormat}'.`; - } + + let timeOutputFormat = timestampField.field_mapping.output_format || 'unknown'; const supportedTimestampOutputFormats = ['unix_timestamp_secs', 'unix_timestamp_millis', 'unix_timestamp_micros', 'unix_timestamp_nanos', 'iso8601', 'rfc3339']; - if (!supportedTimestampOutputFormats.includes(this.timeOutputFormat)) { - return `Timestamp output format '${this.timeOutputFormat} is not yet supported.`; + if (!supportedTimestampOutputFormats.includes(timeOutputFormat)) { + return `Timestamp output format '${timeOutputFormat} is not yet supported.`; } return; } @@ -311,6 +339,7 @@ export class QuickwitDataSource ignore_unavailable: true, index: this.index, }); + let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef)); esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString()); esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString()); diff --git a/src/utils.test.ts b/src/utils.test.ts new file mode 100644 index 0000000..1bcec2c --- /dev/null +++ b/src/utils.test.ts @@ -0,0 +1,20 @@ +import { extractJsonPayload } from "utils"; + +describe('Test utils.extractJsonPayload', () => { + it('Extract valid JSON', () => { + const result = extractJsonPayload('Hey {"foo": "bar"}') + expect(result).toEqual({ + foo: "bar" + }); + }); + + it('Extract non valid JSON', () => { + const result = extractJsonPayload('Hey {"foo": invalid}') + expect(result).toEqual(null); + }); + + it('Extract multiple valid JSONs (not supported)', () => { + const result = extractJsonPayload('Hey {"foo": "bar"} {"foo2": "bar2"}') + expect(result).toEqual(null); + }); +}); diff --git a/src/utils.ts b/src/utils.ts index 52d23cb..ce30f4c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -13,6 +13,20 @@ export const describeMetric = (metric: MetricAggregation) => { return `${metricAggregationConfig[metric.type].label} ${metric.field}`; }; +export const extractJsonPayload = (msg: string) => { + const match = msg.match(/{.*}/); + + if (!match) { + return null; + } + + try { + return JSON.parse(match[0]); + } catch (error) { + return null; + } +} + /** * Utility function to clean up aggregations settings objects. * It removes nullish values and empty strings, array and objects