Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for index pattern #75

Merged
merged 2 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
}

if !toOk {
timeField, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
timeField, err = GetTimestampField(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
Expand Down
91 changes: 77 additions & 14 deletions pkg/quickwit/timestamp_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"io"
"net/http"
"strings"
)

type QuickwitMapping struct {
type QuickwitIndexMetadata struct {
IndexConfig struct {
DocMapping struct {
TimestampField string `json:"timestamp_field"`
FieldMappings []FieldMappings `json:"field_mappings"`
TimestampField string `json:"timestamp_field"`
} `json:"doc_mapping"`
} `json:"index_config"`
}
Expand All @@ -34,25 +34,48 @@ func NewErrorCreationPayload(statusCode int, message string) error {
return errors.New(string(json))
}

func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) {
var payload QuickwitMapping
err := json.Unmarshal(body, &payload)
// TODO: refactor either by using a timestamp alias suppprted by quickwit
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) {
if strings.Contains(index, "*") || strings.Contains(index, ",") {
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
}
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
}

func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", err
}

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(statusCode, errMsg)
}

timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(statusCode, errMsg)
}

qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
return DecodeTimestampFieldFromIndexConfig(body)
}

func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwUrl + "/indexes/" + index
qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl)
func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_pattern=" + indexPattern
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
Expand All @@ -76,5 +99,45 @@ func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin
return "", NewErrorCreationPayload(statusCode, errMsg)
}

return DecodeTimestampFieldInfos(statusCode, body)
return DecodeTimestampFieldFromIndexConfigs(body)
}

func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) {
var payload []QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(500, errMsg)
}

var timestampFieldName string = ""
for _, indexMetadata := range payload {
if timestampFieldName == "" {
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
continue
}

if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField {
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField)
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(400, errMsg)
}
}

qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
}

func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) {
var payload QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(500, errMsg)
}
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
}
139 changes: 43 additions & 96 deletions pkg/quickwit/timestamp_infos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,142 +13,89 @@ func TestDecodeTimestampFieldInfos(t *testing.T) {
query := []byte(`
{
"version": "0.6",
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
"index_config": {
"version": "0.6",
"index_id": "myindex",
"index_uri": "s3://quickwit-indexes/myindex",
"doc_mapping": {
"field_mappings": [
{
"name": "foo",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "timestamp",
"type": "datetime",
"fast": true,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"rfc3339",
"unix_timestamp"
],
"output_format": "rfc3339",
"stored": true
}
],
"tag_fields": [],
"store_source": true,
"index_field_presence": false,
"timestamp_field": "timestamp",
"mode": "dynamic",
"dynamic_mapping": {},
"partition_key": "foo",
"max_num_partitions": 1,
"tokenizers": []
},
"indexing_settings": {},
"search_settings": {
"default_search_fields": [
"foo"
]
},
"retention": null
},
"checkpoint": {},
"create_timestamp": 1701075471,
"sources": []
}
`)

// When
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query)

// Then
require.NoError(t, err)
require.Equal(t, timestampFieldName, "timestamp")
})

t.Run("Test decode nested fields", func(t *testing.T) {
t.Run("Test decode from list of index config", func(t *testing.T) {
// Given
query := []byte(`
[
{
"version": "0.6",
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
"index_config": {
"version": "0.6",
"index_id": "myindex",
"index_uri": "s3://quickwit-indexes/myindex",
"doc_mapping": {
"field_mappings": [
{
"name": "foo",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "sub",
"type": "object",
"field_mappings": [
{
"fast": true,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"rfc3339",
"unix_timestamp"
],
"name": "timestamp",
"output_format": "rfc3339",
"stored": true,
"type": "datetime"
}
]
}
],
"tag_fields": [],
"store_source": true,
"index_field_presence": false,
"timestamp_field": "sub.timestamp",
"mode": "dynamic",
"dynamic_mapping": {},
"partition_key": "foo",
"max_num_partitions": 1,
"tokenizers": []
"timestamp_field": "sub.timestamp"
},
"indexing_settings": {},
"search_settings": {
"default_search_fields": [
"foo"
]
},
"retention": null
},
"checkpoint": {},
"create_timestamp": 1701075471,
"sources": []
}
]
`)

// When
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query)

// Then
require.NoError(t, err)
require.Equal(t, timestampFieldName, "sub.timestamp")
})

t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) {
// Given
query := []byte(`
[
{
"version": "0.6",
"index_config": {
"doc_mapping": {
"timestamp_field": "sub.timestamp"
},
"indexing_settings": {},
"retention": null
},
"sources": []
},
{
"version": "0.6",
"index_config": {
"doc_mapping": {
"timestamp_field": "sub.timestamp2"
},
"indexing_settings": {},
"retention": null
},
"sources": []
}
]
`)

// When
_, err := DecodeTimestampFieldFromIndexConfigs(query)

// Then
require.Error(t, err)
require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields")
})
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ export function enhanceDataFrameWithDataLinks(dataFrame: DataFrame, dataLinks: D
config: {},
values: displayedMessages,
}
console.log('newField');
console.log(dataFrame);
dataFrame.fields = [newField, ...dataFrame.fields];
}

Expand Down
Loading