Skip to content

Commit

Permalink
opensearchapi: add UpdateByQueryRethrottle function to rootClient
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Hahn <[email protected]>
  • Loading branch information
Jakob3xD committed Nov 9, 2023
1 parent 64abd16 commit 9b520bf
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 0 deletions.
57 changes: 57 additions & 0 deletions opensearchapi/api_update_by_query_rethrottle-params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opensearchapi

import (
"strconv"
)

// UpdateByQueryRethrottleParams represents possible parameters for the UpdateByQueryRethrottleReq
type UpdateByQueryRethrottleParams struct {
RequestsPerSecond *int

Pretty bool
Human bool
ErrorTrace bool
}

func (r UpdateByQueryRethrottleParams) get() map[string]string {
params := make(map[string]string)

if r.RequestsPerSecond != nil {
params["requests_per_second"] = strconv.FormatInt(int64(*r.RequestsPerSecond), 10)
}

if r.Pretty {
params["pretty"] = "true"
}

if r.Human {
params["human"] = "true"
}

if r.ErrorTrace {
params["error_trace"] = "true"
}

return params
}
131 changes: 131 additions & 0 deletions opensearchapi/api_update_by_query_rethrottle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opensearchapi

import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/opensearch-project/opensearch-go/v2"
)

// UpdateByQueryRethrottle executes a / request with the optional UpdateByQueryRethrottleReq
func (c Client) UpdateByQueryRethrottle(ctx context.Context, req UpdateByQueryRethrottleReq) (*UpdateByQueryRethrottleResp, error) {
var (
data UpdateByQueryRethrottleResp
err error
)
if data.response, err = c.do(ctx, req, &data); err != nil {
return &data, err
}

return &data, nil
}

// UpdateByQueryRethrottleReq represents possible options for the / request
type UpdateByQueryRethrottleReq struct {
TaskID string

Header http.Header
Params UpdateByQueryRethrottleParams
}

// GetRequest returns the *http.Request that gets executed by the client
func (r UpdateByQueryRethrottleReq) GetRequest() (*http.Request, error) {
return opensearch.BuildRequest(
"POST",
fmt.Sprintf("/_update_by_query/%s/_rethrottle", r.TaskID),
nil,
r.Params.get(),
r.Header,
)
}

// UpdateByQueryRethrottleResp represents the returned struct of the / response
type UpdateByQueryRethrottleResp struct {
Nodes map[string]struct {
Name string `json:"name"`
TransportAddress string `json:"transport_address"`
Host string `json:"host"`
IP string `json:"ip"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Tasks map[string]struct {
Node string `json:"node"`
ID int `json:"id"`
Type string `json:"type"`
Action string `json:"action"`
Status struct {
Total int `json:"total"`
Updated int `json:"updated"`
Created int `json:"created"`
Deleted int `json:"deleted"`
Batches int `json:"batches"`
VersionConflicts int `json:"version_conflicts"`
Noops int `json:"noops"`
Retries struct {
Bulk int `json:"bulk"`
Search int `json:"search"`
} `json:"retries"`
ThrottledMillis int `json:"throttled_millis"`
RequestsPerSecond float64 `json:"requests_per_second"`
ThrottledUntilMillis int `json:"throttled_until_millis"`
} `json:"status"`
Description string `json:"description"`
StartTimeInMillis int64 `json:"start_time_in_millis"`
RunningTimeInNanos int `json:"running_time_in_nanos"`
Cancellable bool `json:"cancellable"`
Cancelled bool `json:"cancelled"`
Headers json.RawMessage `json:"headers"`
ResourceStats struct {
Average struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"average"`
Total struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"total"`
Min struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"min"`
Max struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"max"`
ThreadInfo struct {
ThreadExecutions int `json:"thread_executions"`
ActiveThreads int `json:"active_threads"`
} `json:"thread_info"`
} `json:"resource_stats"`
} `json:"tasks"`
} `json:"nodes"`
response *opensearch.Response
}

// Inspect returns the Inspect type containing the raw *opensearch.Reponse
func (r UpdateByQueryRethrottleResp) Inspect() Inspect {
return Inspect{Response: r.response}
}
116 changes: 116 additions & 0 deletions opensearchapi/api_update_by_query_rethrottle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//go:build integration

package opensearchapi_test

import (
"context"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
osapitest "github.com/opensearch-project/opensearch-go/v2/opensearchapi/internal/test"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
)

func TestUpdateByQueryRethrottle(t *testing.T) {
t.Parallel()
client, err := opensearchapi.NewDefaultClient()
require.Nil(t, err)

testIndex := "test-updatebyquery-rethrottle-source"
t.Cleanup(func() {
client.Indices.Delete(
nil,
opensearchapi.IndicesDeleteReq{
Indices: []string{testIndex},
Params: opensearchapi.IndicesDeleteParams{IgnoreUnavailable: opensearchapi.ToPointer(true)},
},
)
})

client.Indices.Create(
nil,
opensearchapi.IndicesCreateReq{
Index: testIndex,
Body: strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0}}`),
},
)
bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: testIndex,
Client: client,
Refresh: "wait_for",
})
for i := 1; i <= 60; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(`{"foo": "bar", "counter": 1}`),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}

updatebyquery, err := client.UpdateByQuery(
nil,
opensearchapi.UpdateByQueryReq{
Indices: []string{testIndex},
Body: strings.NewReader(`{"script":{"source":"ctx._source.counter += params.count","lang":"painless","params":{"count":4}}}`),
Params: opensearchapi.UpdateByQueryParams{
WaitForCompletion: opensearchapi.ToPointer(false),
RequestsPerSecond: opensearchapi.ToPointer(1),
},
},
)
require.Nil(t, err)
t.Run("with request", func(t *testing.T) {
resp, err := client.UpdateByQueryRethrottle(
nil,
opensearchapi.UpdateByQueryRethrottleReq{
TaskID: updatebyquery.Task,
Params: opensearchapi.UpdateByQueryRethrottleParams{RequestsPerSecond: opensearchapi.ToPointer(40)},
},
)
require.Nil(t, err)
assert.NotEmpty(t, resp)
osapitest.CompareRawJSONwithParsedJSON(t, resp, resp.Inspect().Response)
})

t.Run("inspect", func(t *testing.T) {
failingClient, err := osapitest.CreateFailingClient()
require.Nil(t, err)

res, err := failingClient.UpdateByQueryRethrottle(nil, opensearchapi.UpdateByQueryRethrottleReq{})
assert.NotNil(t, err)
assert.NotNil(t, res)
osapitest.VerifyInspect(t, res.Inspect())
})
}

0 comments on commit 9b520bf

Please sign in to comment.