From d054bceae0a88563ae40b1ed34fe23415dfcb571 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 7 Jan 2025 16:13:14 -0500 Subject: [PATCH] refactor: move argus client code to ancla --- chrysom/README.md | 21 -- chrysom/basicClient.go | 275 --------------- chrysom/basicClient_test.go | 612 --------------------------------- chrysom/doc.go | 5 - chrysom/listenerClient.go | 183 ---------- chrysom/listenerClient_test.go | 203 ----------- chrysom/metrics.go | 44 --- chrysom/pushResult.go | 28 -- chrysom/store.go | 47 --- 9 files changed, 1418 deletions(-) delete mode 100644 chrysom/README.md delete mode 100644 chrysom/basicClient.go delete mode 100644 chrysom/basicClient_test.go delete mode 100644 chrysom/doc.go delete mode 100644 chrysom/listenerClient.go delete mode 100644 chrysom/listenerClient_test.go delete mode 100644 chrysom/metrics.go delete mode 100644 chrysom/pushResult.go delete mode 100644 chrysom/store.go diff --git a/chrysom/README.md b/chrysom/README.md deleted file mode 100644 index 83f6cec0..00000000 --- a/chrysom/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# chrysom -The client library for communicating with Argus over HTTP. - -[![Go Reference](https://pkg.go.dev/badge/github.com/xmidt-org/argus/chrysom.svg)](https://pkg.go.dev/github.com/xmidt-org/argus/chrysom) - -## Summary -This package enables CRUD operations on the items stored in Argus. The items being stored are valid JSON objects. The package also provides a listener that is able to poll from Argus on an interval. - -## CRUD Operations - -- On a `GetItems()` call, chrysom returns a list of all items belonging to the provided owner. -- `PushItem()` updates an item if the item exists and the ownership matches. If the item does not exist then a new item will be created. It will return an error or a string saying whether the item was successfully created or updated. -- `RemoveItem()` removes the item if it exists and returns the data associated to it. - -## Listener -The client contains a listener that will listen for item updates from Argus on an interval based on the client configuration. - -Listener provides a mechanism to fetch a copy of all items within a bucket on an interval. If not provided, listening won't be enable for this client. - -- Start begins listening for updates on an interval. A Listener must be given in the ListenerConfig for this to work. If a listener process is already in progress, calling Start() is a NoOp. If you want to restart the current listener process, call Stop() first. -- Stop requests the current listener process to stop and waits for its goroutine to complete. Calling Stop() when a listener is not running (or while one is getting stopped) returns an error. \ No newline at end of file diff --git a/chrysom/basicClient.go b/chrysom/basicClient.go deleted file mode 100644 index f80d0459..00000000 --- a/chrysom/basicClient.go +++ /dev/null @@ -1,275 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - - "github.com/xmidt-org/argus/model" - "github.com/xmidt-org/argus/store" - "github.com/xmidt-org/bascule/acquire" - "go.uber.org/zap" -) - -var ( - ErrNilMeasures = errors.New("measures cannot be nil") - ErrAddressEmpty = errors.New("argus address is required") - ErrBucketEmpty = errors.New("bucket name is required") - ErrItemIDEmpty = errors.New("item ID is required") - ErrItemDataEmpty = errors.New("data field in item is required") - ErrUndefinedIntervalTicker = errors.New("interval ticker is nil. Can't listen for updates") - ErrAuthAcquirerFailure = errors.New("failed acquiring auth token") - ErrBadRequest = errors.New("argus rejected the request as invalid") -) - -var ( - errNonSuccessResponse = errors.New("argus responded with a non-success status code") - errNewRequestFailure = errors.New("failed creating an HTTP request") - errDoRequestFailure = errors.New("http client failed while sending request") - errReadingBodyFailure = errors.New("failed while reading http response body") - errJSONUnmarshal = errors.New("failed unmarshaling JSON response payload") - errJSONMarshal = errors.New("failed marshaling item as JSON payload") -) - -// BasicClientConfig contains config data for the client that will be used to -// make requests to the Argus client. -type BasicClientConfig struct { - // Address is the Argus URL (i.e. https://example-argus.io:8090) - Address string - - // Bucket partition to be used by this client. - Bucket string - - // HTTPClient refers to the client that will be used to send requests. - // (Optional) Defaults to http.DefaultClient. - HTTPClient *http.Client - - // Auth provides the mechanism to add auth headers to outgoing requests. - // (Optional) If not provided, no auth headers are added. - Auth Auth -} - -// BasicClient is the client used to make requests to Argus. -type BasicClient struct { - client *http.Client - auth acquire.Acquirer - storeBaseURL string - bucket string - getLogger func(context.Context) *zap.Logger -} - -// Auth contains authorization data for requests to Argus. -type Auth struct { - JWT acquire.RemoteBearerTokenAcquirerOptions - Basic string -} - -type response struct { - Body []byte - ArgusErrorHeader string - Code int -} - -const ( - storeAPIPath = "/api/v1/store" - errWrappedFmt = "%w: %s" - errStatusCodeFmt = "%w: received status %v" - errorHeaderKey = "errorHeader" -) - -// Items is a slice of model.Item(s) . -type Items []model.Item - -// NewBasicClient creates a new BasicClient that can be used to -// make requests to Argus. -func NewBasicClient(config BasicClientConfig, - getLogger func(context.Context) *zap.Logger) (*BasicClient, error) { - err := validateBasicConfig(&config) - if err != nil { - return nil, err - } - - tokenAcquirer, err := buildTokenAcquirer(config.Auth) - if err != nil { - return nil, err - } - clientStore := &BasicClient{ - client: config.HTTPClient, - auth: tokenAcquirer, - bucket: config.Bucket, - storeBaseURL: config.Address + storeAPIPath, - getLogger: getLogger, - } - - return clientStore, nil -} - -// GetItems fetches all items that belong to a given owner. -func (c *BasicClient) GetItems(ctx context.Context, owner string) (Items, error) { - response, err := c.sendRequest(ctx, owner, http.MethodGet, fmt.Sprintf("%s/%s", c.storeBaseURL, c.bucket), nil) - if err != nil { - return nil, err - } - - if response.Code != http.StatusOK { - c.getLogger(ctx).Error("Argus responded with non-200 response for GetItems request", - zap.Int("code", response.Code), zap.String(errorHeaderKey, response.ArgusErrorHeader)) - return nil, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(response.Code), response.Code) - } - - var items Items - - err = json.Unmarshal(response.Body, &items) - if err != nil { - return nil, fmt.Errorf("GetItems: %w: %s", errJSONUnmarshal, err.Error()) - } - - return items, nil -} - -// PushItem creates a new item if one doesn't already exist. If an item exists -// and the ownership matches, the item is simply updated. -func (c *BasicClient) PushItem(ctx context.Context, owner string, item model.Item) (PushResult, error) { - err := validatePushItemInput(owner, item) - if err != nil { - return NilPushResult, err - } - - data, err := json.Marshal(item) - if err != nil { - return NilPushResult, fmt.Errorf(errWrappedFmt, errJSONMarshal, err.Error()) - } - - response, err := c.sendRequest(ctx, owner, http.MethodPut, fmt.Sprintf("%s/%s/%s", c.storeBaseURL, c.bucket, item.ID), bytes.NewReader(data)) - if err != nil { - return NilPushResult, err - } - - if response.Code == http.StatusCreated { - return CreatedPushResult, nil - } - - if response.Code == http.StatusOK { - return UpdatedPushResult, nil - } - - c.getLogger(ctx).Error("Argus responded with a non-successful status code for a PushItem request", - zap.Int("code", response.Code), zap.String(errorHeaderKey, response.ArgusErrorHeader)) - - return NilPushResult, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(response.Code), response.Code) -} - -// RemoveItem removes the item if it exists and returns the data associated to it. -func (c *BasicClient) RemoveItem(ctx context.Context, id, owner string) (model.Item, error) { - if len(id) < 1 { - return model.Item{}, ErrItemIDEmpty - } - - resp, err := c.sendRequest(ctx, owner, http.MethodDelete, fmt.Sprintf("%s/%s/%s", c.storeBaseURL, c.bucket, id), nil) - if err != nil { - return model.Item{}, err - } - - if resp.Code != http.StatusOK { - c.getLogger(ctx).Error("Argus responded with a non-successful status code for a RemoveItem request", - zap.Int("code", resp.Code), zap.String(errorHeaderKey, resp.ArgusErrorHeader)) - return model.Item{}, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(resp.Code), resp.Code) - } - - var item model.Item - err = json.Unmarshal(resp.Body, &item) - if err != nil { - return item, fmt.Errorf("RemoveItem: %w: %s", errJSONUnmarshal, err.Error()) - } - return item, nil -} - -func validatePushItemInput(owner string, item model.Item) error { - if len(item.ID) < 1 { - return ErrItemIDEmpty - } - - if len(item.Data) < 1 { - return ErrItemDataEmpty - } - - return nil -} - -func (c *BasicClient) sendRequest(ctx context.Context, owner, method, url string, body io.Reader) (response, error) { - r, err := http.NewRequestWithContext(ctx, method, url, body) - if err != nil { - return response{}, fmt.Errorf(errWrappedFmt, errNewRequestFailure, err.Error()) - } - err = acquire.AddAuth(r, c.auth) - if err != nil { - return response{}, fmt.Errorf(errWrappedFmt, ErrAuthAcquirerFailure, err.Error()) - } - if len(owner) > 0 { - r.Header.Set(store.ItemOwnerHeaderKey, owner) - } - resp, err := c.client.Do(r) - if err != nil { - return response{}, fmt.Errorf(errWrappedFmt, errDoRequestFailure, err.Error()) - } - defer resp.Body.Close() - var sqResp = response{ - Code: resp.StatusCode, - ArgusErrorHeader: resp.Header.Get(store.XmidtErrorHeaderKey), - } - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - return sqResp, fmt.Errorf(errWrappedFmt, errReadingBodyFailure, err.Error()) - } - sqResp.Body = bodyBytes - return sqResp, nil -} - -func isEmpty(options acquire.RemoteBearerTokenAcquirerOptions) bool { - return len(options.AuthURL) < 1 || options.Buffer == 0 || options.Timeout == 0 -} - -// translateNonSuccessStatusCode returns as specific error -// for known Argus status codes. -func translateNonSuccessStatusCode(code int) error { - switch code { - case http.StatusBadRequest: - return ErrBadRequest - case http.StatusUnauthorized, http.StatusForbidden: - return ErrFailedAuthentication - default: - return errNonSuccessResponse - } -} - -func buildTokenAcquirer(auth Auth) (acquire.Acquirer, error) { - if !isEmpty(auth.JWT) { - return acquire.NewRemoteBearerTokenAcquirer(auth.JWT) - } else if len(auth.Basic) > 0 { - return acquire.NewFixedAuthAcquirer(auth.Basic) - } - return &acquire.DefaultAcquirer{}, nil -} - -func validateBasicConfig(config *BasicClientConfig) error { - if config.Address == "" { - return ErrAddressEmpty - } - - if config.Bucket == "" { - return ErrBucketEmpty - } - - if config.HTTPClient == nil { - config.HTTPClient = http.DefaultClient - } - - return nil -} diff --git a/chrysom/basicClient_test.go b/chrysom/basicClient_test.go deleted file mode 100644 index 3a5cd36f..00000000 --- a/chrysom/basicClient_test.go +++ /dev/null @@ -1,612 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/xmidt-org/argus/model" - "github.com/xmidt-org/argus/store" - "github.com/xmidt-org/sallust" -) - -const failingURL = "nowhere://" - -var ( - _ Pusher = &BasicClient{} - _ Reader = &BasicClient{} -) - -func TestValidateBasicConfig(t *testing.T) { - type testCase struct { - Description string - Input *BasicClientConfig - ExpectedErr error - ExpectedConfig *BasicClientConfig - } - - allDefaultsCaseConfig := &BasicClientConfig{ - HTTPClient: http.DefaultClient, - Address: "http://awesome-argus-hostname.io", - Bucket: "bucket-name", - } - myAmazingClient := &http.Client{Timeout: time.Hour} - allDefinedCaseConfig := &BasicClientConfig{ - HTTPClient: myAmazingClient, - Address: "http://legit-argus-hostname.io", - Auth: Auth{}, - Bucket: "amazing-bucket", - } - - tcs := []testCase{ - { - Description: "No address", - Input: &BasicClientConfig{ - Bucket: "bucket-name", - }, - ExpectedErr: ErrAddressEmpty, - }, - { - Description: "No bucket", - Input: &BasicClientConfig{ - Address: "http://awesome-argus-hostname.io", - }, - ExpectedErr: ErrBucketEmpty, - }, - { - Description: "All default values", - Input: &BasicClientConfig{ - Address: "http://awesome-argus-hostname.io", - Bucket: "bucket-name", - }, - ExpectedConfig: allDefaultsCaseConfig, - }, - { - Description: "All defined", - Input: &BasicClientConfig{ - Address: "http://legit-argus-hostname.io", - Bucket: "amazing-bucket", - HTTPClient: myAmazingClient, - }, - ExpectedConfig: allDefinedCaseConfig, - }, - } - - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - assert := assert.New(t) - err := validateBasicConfig(tc.Input) - assert.Equal(tc.ExpectedErr, err) - if tc.ExpectedErr == nil { - assert.Equal(tc.ExpectedConfig, tc.Input) - } - }) - } -} - -func TestSendRequest(t *testing.T) { - type testCase struct { - Description string - Owner string - Method string - URL string - Body []byte - AcquirerFails bool - ClientDoFails bool - ExpectedResponse response - ExpectedErr error - } - - tcs := []testCase{ - { - Description: "New Request fails", - Method: "what method?", - URL: "http://argus-hostname.io", - ExpectedErr: errNewRequestFailure, - }, - { - Description: "Auth acquirer fails", - Method: http.MethodGet, - URL: "http://argus-hostname.io", - AcquirerFails: true, - ExpectedErr: ErrAuthAcquirerFailure, - }, - { - Description: "Client Do fails", - Method: http.MethodPut, - ClientDoFails: true, - ExpectedErr: errDoRequestFailure, - }, - { - Description: "Happy path", - Method: http.MethodPut, - URL: "http://argus-hostname.io", - Body: []byte("testing"), - Owner: "HappyCaseOwner", - ExpectedResponse: response{ - Code: http.StatusOK, - Body: []byte("testing"), - }, - }, - } - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - echoHandler := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(tc.Owner, r.Header.Get(store.ItemOwnerHeaderKey)) - rw.WriteHeader(http.StatusOK) - bodyBytes, err := io.ReadAll(r.Body) - require.Nil(err) - rw.Write(bodyBytes) - }) - - server := httptest.NewServer(echoHandler) - defer server.Close() - - client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: server.Client(), - Address: "http://argus-hostname.io", - Bucket: "bucket-name", - }, sallust.Get) - - if tc.AcquirerFails { - client.auth = acquirerFunc(failAcquirer) - } - - var URL = server.URL - if tc.ClientDoFails { - URL = "http://should-definitely-fail.net" - } - - assert.Nil(err) - resp, err := client.sendRequest(context.TODO(), tc.Owner, tc.Method, URL, bytes.NewBuffer(tc.Body)) - - if tc.ExpectedErr == nil { - assert.Equal(http.StatusOK, resp.Code) - assert.Equal(tc.ExpectedResponse, resp) - } else { - assert.True(errors.Is(err, tc.ExpectedErr)) - } - }) - } -} - -func TestGetItems(t *testing.T) { - type testCase struct { - Description string - ResponsePayload []byte - ResponseCode int - ShouldMakeRequestFail bool - ShouldDoRequestFail bool - ExpectedErr error - ExpectedOutput Items - } - - tcs := []testCase{ - { - - Description: "Make request fails", - ShouldMakeRequestFail: true, - ExpectedErr: ErrAuthAcquirerFailure, - }, - { - Description: "Do request fails", - ShouldDoRequestFail: true, - ExpectedErr: errDoRequestFailure, - }, - { - Description: "Unauthorized", - ResponseCode: http.StatusForbidden, - ExpectedErr: ErrFailedAuthentication, - }, - { - Description: "Bad request", - ResponseCode: http.StatusBadRequest, - ExpectedErr: ErrBadRequest, - }, - { - Description: "Other non-success", - ResponseCode: http.StatusInternalServerError, - ExpectedErr: errNonSuccessResponse, - }, - { - Description: "Payload unmarshal error", - ResponseCode: http.StatusOK, - ResponsePayload: []byte("[{}"), - ExpectedErr: errJSONUnmarshal, - }, - { - Description: "Happy path", - ResponseCode: http.StatusOK, - ResponsePayload: getItemsValidPayload(), - ExpectedOutput: getItemsHappyOutput(), - }, - } - - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - var ( - assert = assert.New(t) - require = require.New(t) - bucket = "bucket-name" - owner = "owner-name" - ) - - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(http.MethodGet, r.Method) - assert.Equal(owner, r.Header.Get(store.ItemOwnerHeaderKey)) - assert.Equal(fmt.Sprintf("%s/%s", storeAPIPath, bucket), r.URL.Path) - - rw.WriteHeader(tc.ResponseCode) - rw.Write(tc.ResponsePayload) - })) - - client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: server.Client(), - Address: server.URL, - Bucket: bucket, - }, sallust.Get) - - require.Nil(err) - - if tc.ShouldMakeRequestFail { - client.auth = acquirerFunc(failAcquirer) - } - - if tc.ShouldDoRequestFail { - client.storeBaseURL = failingURL - } - - output, err := client.GetItems(context.TODO(), owner) - - assert.True(errors.Is(err, tc.ExpectedErr)) - if tc.ExpectedErr == nil { - assert.EqualValues(tc.ExpectedOutput, output) - } - }) - } -} - -func TestPushItem(t *testing.T) { - type testCase struct { - Description string - Item model.Item - Owner string - ResponseCode int - ShouldEraseBucket bool - ShouldRespNonSuccess bool - ShouldMakeRequestFail bool - ShouldDoRequestFail bool - ExpectedErr error - ExpectedOutput PushResult - } - - validItem := model.Item{ - ID: "252f10c83610ebca1a059c0bae8255eba2f95be4d1d7bcfa89d7248a82d9f111", - Data: map[string]interface{}{ - "field0": float64(0), - "nested": map[string]interface{}{ - "response": "wow", - }, - }} - - tcs := []testCase{ - { - Description: "Item ID Missing", - Item: model.Item{Data: map[string]interface{}{}}, - ExpectedErr: ErrItemIDEmpty, - }, - { - Description: "Item Data missing", - Item: model.Item{ID: validItem.ID}, - ExpectedErr: ErrItemDataEmpty, - }, - { - Description: "Make request fails", - Item: validItem, - ShouldMakeRequestFail: true, - ExpectedErr: ErrAuthAcquirerFailure, - }, - { - Description: "Do request fails", - Item: validItem, - ShouldDoRequestFail: true, - ExpectedErr: errDoRequestFailure, - }, - { - Description: "Unauthorized", - Item: validItem, - ResponseCode: http.StatusForbidden, - ExpectedErr: ErrFailedAuthentication, - }, - { - Description: "Bad request", - Item: validItem, - ResponseCode: http.StatusBadRequest, - ExpectedErr: ErrBadRequest, - }, - { - Description: "Other non-success", - Item: validItem, - ResponseCode: http.StatusInternalServerError, - ExpectedErr: errNonSuccessResponse, - }, - { - Description: "Create success", - Item: validItem, - ResponseCode: http.StatusCreated, - ExpectedOutput: CreatedPushResult, - }, - { - Description: "Update success", - Item: validItem, - ResponseCode: http.StatusOK, - ExpectedOutput: UpdatedPushResult, - }, - { - Description: "Update success with owner", - Item: validItem, - ResponseCode: http.StatusOK, - Owner: "owner-name", - ExpectedOutput: UpdatedPushResult, - }, - } - - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - var ( - assert = assert.New(t) - require = require.New(t) - bucket = "bucket-name" - id = "252f10c83610ebca1a059c0bae8255eba2f95be4d1d7bcfa89d7248a82d9f111" - ) - - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(fmt.Sprintf("%s/%s/%s", storeAPIPath, bucket, id), r.URL.Path) - assert.Equal(tc.Owner, r.Header.Get(store.ItemOwnerHeaderKey)) - rw.WriteHeader(tc.ResponseCode) - - if tc.ResponseCode == http.StatusCreated || tc.ResponseCode == http.StatusOK { - payload, err := io.ReadAll(r.Body) - require.Nil(err) - var item model.Item - err = json.Unmarshal(payload, &item) - require.Nil(err) - assert.EqualValues(tc.Item, item) - } - })) - - client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: server.Client(), - Address: server.URL, - Bucket: bucket, - }, sallust.Get) - - if tc.ShouldMakeRequestFail { - client.auth = acquirerFunc(failAcquirer) - } - - if tc.ShouldDoRequestFail { - client.storeBaseURL = failingURL - } - - if tc.ShouldEraseBucket { - bucket = "" - } - - require.Nil(err) - output, err := client.PushItem(context.TODO(), tc.Owner, tc.Item) - - if tc.ExpectedErr == nil { - assert.EqualValues(tc.ExpectedOutput, output) - } else { - assert.True(errors.Is(err, tc.ExpectedErr)) - } - }) - } -} - -func TestRemoveItem(t *testing.T) { - type testCase struct { - Description string - ResponsePayload []byte - ResponseCode int - Owner string - ShouldRespNonSuccess bool - ShouldMakeRequestFail bool - ShouldDoRequestFail bool - ExpectedErr error - ExpectedOutput model.Item - } - - tcs := []testCase{ - { - Description: "Make request fails", - ShouldMakeRequestFail: true, - ExpectedErr: ErrAuthAcquirerFailure, - }, - { - Description: "Do request fails", - ShouldDoRequestFail: true, - ExpectedErr: errDoRequestFailure, - }, - { - Description: "Unauthorized", - ResponseCode: http.StatusForbidden, - ExpectedErr: ErrFailedAuthentication, - }, - { - Description: "Bad request", - ResponseCode: http.StatusBadRequest, - ExpectedErr: ErrBadRequest, - }, - { - Description: "Other non-success", - ResponseCode: http.StatusInternalServerError, - ExpectedErr: errNonSuccessResponse, - }, - { - Description: "Unmarshal failure", - ResponseCode: http.StatusOK, - ResponsePayload: []byte("{{}"), - ExpectedErr: errJSONUnmarshal, - }, - { - Description: "Succcess", - ResponseCode: http.StatusOK, - ResponsePayload: getRemoveItemValidPayload(), - ExpectedOutput: getRemoveItemHappyOutput(), - }, - } - - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - var ( - assert = assert.New(t) - require = require.New(t) - bucket = "bucket-name" - // nolint:gosec - id = "7e8c5f378b4addbaebc70897c4478cca06009e3e360208ebd073dbee4b3774e7" - ) - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(fmt.Sprintf("%s/%s/%s", storeAPIPath, bucket, id), r.URL.Path) - assert.Equal(http.MethodDelete, r.Method) - rw.WriteHeader(tc.ResponseCode) - rw.Write(tc.ResponsePayload) - })) - - client, err := NewBasicClient(BasicClientConfig{ - HTTPClient: server.Client(), - Address: server.URL, - Bucket: bucket, - }, sallust.Get) - - if tc.ShouldMakeRequestFail { - client.auth = acquirerFunc(failAcquirer) - } - - if tc.ShouldDoRequestFail { - client.storeBaseURL = failingURL - } - - require.Nil(err) - output, err := client.RemoveItem(context.TODO(), id, tc.Owner) - - if tc.ExpectedErr == nil { - assert.EqualValues(tc.ExpectedOutput, output) - } else { - assert.True(errors.Is(err, tc.ExpectedErr)) - } - }) - } -} - -func TestTranslateStatusCode(t *testing.T) { - type testCase struct { - Description string - Code int - ExpectedErr error - } - - tcs := []testCase{ - { - Code: http.StatusForbidden, - ExpectedErr: ErrFailedAuthentication, - }, - { - Code: http.StatusUnauthorized, - ExpectedErr: ErrFailedAuthentication, - }, - { - Code: http.StatusBadRequest, - ExpectedErr: ErrBadRequest, - }, - { - Code: http.StatusInternalServerError, - ExpectedErr: errNonSuccessResponse, - }, - } - - for _, tc := range tcs { - t.Run(tc.Description, func(t *testing.T) { - assert := assert.New(t) - assert.Equal(tc.ExpectedErr, translateNonSuccessStatusCode(tc.Code)) - }) - } -} - -func failAcquirer() (string, error) { - return "", errors.New("always fail") -} - -type acquirerFunc func() (string, error) - -func (a acquirerFunc) Acquire() (string, error) { - return a() -} - -func getRemoveItemValidPayload() []byte { - return []byte(` - { - "id": "7e8c5f378b4addbaebc70897c4478cca06009e3e360208ebd073dbee4b3774e7", - "data": { - "words": [ - "Hello","World" - ], - "year": 2021 - }, - "ttl": 100 - }`) -} - -func getRemoveItemHappyOutput() model.Item { - return model.Item{ - ID: "7e8c5f378b4addbaebc70897c4478cca06009e3e360208ebd073dbee4b3774e7", - Data: map[string]interface{}{ - "words": []interface{}{"Hello", "World"}, - "year": float64(2021), - }, - TTL: aws.Int64(100), - } -} - -func getItemsValidPayload() []byte { - return []byte(`[{ - "id": "7e8c5f378b4addbaebc70897c4478cca06009e3e360208ebd073dbee4b3774e7", - "data": { - "words": [ - "Hello","World" - ], - "year": 2021 - }, - "ttl": 255 - }]`) -} - -func getItemsHappyOutput() Items { - return []model.Item{ - { - ID: "7e8c5f378b4addbaebc70897c4478cca06009e3e360208ebd073dbee4b3774e7", - Data: map[string]interface{}{ - "words": []interface{}{"Hello", "World"}, - "year": float64(2021), - }, - TTL: aws.Int64(255), - }, - } -} diff --git a/chrysom/doc.go b/chrysom/doc.go deleted file mode 100644 index 8ad7f375..00000000 --- a/chrysom/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -// Package chrysom provides a client in order to interact with argus. -package chrysom diff --git a/chrysom/listenerClient.go b/chrysom/listenerClient.go deleted file mode 100644 index b02cbf67..00000000 --- a/chrysom/listenerClient.go +++ /dev/null @@ -1,183 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "context" - "errors" - "sync/atomic" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/xmidt-org/sallust" - "go.uber.org/zap" -) - -// Errors that can be returned by this package. Since some of these errors are returned wrapped, it -// is safest to use errors.Is() to check for them. -// Some internal errors might be unwrapped from output errors but unless these errors become exported, -// they are not part of the library API and may change in future versions. -var ( - ErrFailedAuthentication = errors.New("failed to authentication with argus") - - ErrListenerNotStopped = errors.New("listener is either running or starting") - ErrListenerNotRunning = errors.New("listener is either stopped or stopping") - ErrNoListenerProvided = errors.New("no listener provided") - ErrNoReaderProvided = errors.New("no reader provided") -) - -// listening states -const ( - stopped int32 = iota - running - transitioning -) - -const ( - defaultPullInterval = time.Second * 5 -) - -// ListenerConfig contains config data for polling the Argus client. -type ListenerClientConfig struct { - // Listener provides a mechanism to fetch a copy of all items within a bucket on - // an interval. - // (Optional). If not provided, listening won't be enabled for this client. - Listener Listener - - // PullInterval is how often listeners should get updates. - // (Optional). Defaults to 5 seconds. - PullInterval time.Duration - - // Logger to be used by the client. - // (Optional). By default a no op logger will be used. - Logger *zap.Logger -} - -// ListenerClient is the client used to poll Argus for updates. -type ListenerClient struct { - observer *observerConfig - logger *zap.Logger - setLogger func(context.Context, *zap.Logger) context.Context - reader Reader -} - -type observerConfig struct { - listener Listener - ticker *time.Ticker - pullInterval time.Duration - measures *Measures - shutdown chan struct{} - state int32 -} - -// NewListenerClient creates a new ListenerClient to be used to poll Argus -// for updates. -func NewListenerClient(config ListenerClientConfig, - setLogger func(context.Context, *zap.Logger) context.Context, - measures *Measures, r Reader, -) (*ListenerClient, error) { - err := validateListenerConfig(&config) - if err != nil { - return nil, err - } - if measures == nil { - return nil, ErrNilMeasures - } - if setLogger == nil { - setLogger = func(ctx context.Context, _ *zap.Logger) context.Context { - return ctx - } - } - if r == nil { - return nil, ErrNoReaderProvided - } - return &ListenerClient{ - observer: &observerConfig{ - listener: config.Listener, - ticker: time.NewTicker(config.PullInterval), - pullInterval: config.PullInterval, - measures: measures, - shutdown: make(chan struct{}), - }, - logger: config.Logger, - setLogger: setLogger, - reader: r, - }, nil -} - -// Start begins listening for updates on an interval given that client configuration -// is setup correctly. If a listener process is already in progress, calling Start() -// is a NoOp. If you want to restart the current listener process, call Stop() first. -func (c *ListenerClient) Start(ctx context.Context) error { - if c.observer == nil || c.observer.listener == nil { - c.logger.Warn("No listener was setup to receive updates.") - return nil - } - if c.observer.ticker == nil { - c.logger.Error("Observer ticker is nil", zap.Error(ErrUndefinedIntervalTicker)) - return ErrUndefinedIntervalTicker - } - - if !atomic.CompareAndSwapInt32(&c.observer.state, stopped, transitioning) { - c.logger.Error("Start called when a listener was not in stopped state", zap.Error(ErrListenerNotStopped)) - return ErrListenerNotStopped - } - - c.observer.ticker.Reset(c.observer.pullInterval) - go func() { - for { - select { - case <-c.observer.shutdown: - return - case <-c.observer.ticker.C: - outcome := SuccessOutcome - ctx := c.setLogger(context.Background(), c.logger) - items, err := c.reader.GetItems(ctx, "") - if err == nil { - c.observer.listener.Update(items) - } else { - outcome = FailureOutcome - c.logger.Error("Failed to get items for listeners", zap.Error(err)) - } - c.observer.measures.Polls.With(prometheus.Labels{ - OutcomeLabel: outcome}).Add(1) - } - } - }() - - atomic.SwapInt32(&c.observer.state, running) - return nil -} - -// Stop requests the current listener process to stop and waits for its goroutine to complete. -// Calling Stop() when a listener is not running (or while one is getting stopped) returns an -// error. -func (c *ListenerClient) Stop(ctx context.Context) error { - if c.observer == nil || c.observer.ticker == nil { - return nil - } - - if !atomic.CompareAndSwapInt32(&c.observer.state, running, transitioning) { - c.logger.Error("Stop called when a listener was not in running state", zap.Error(ErrListenerNotStopped)) - return ErrListenerNotRunning - } - - c.observer.ticker.Stop() - c.observer.shutdown <- struct{}{} - atomic.SwapInt32(&c.observer.state, stopped) - return nil -} - -func validateListenerConfig(config *ListenerClientConfig) error { - if config.Listener == nil { - return ErrNoListenerProvided - } - if config.Logger == nil { - config.Logger = sallust.Default() - } - if config.PullInterval == 0 { - config.PullInterval = defaultPullInterval - } - return nil -} diff --git a/chrysom/listenerClient_test.go b/chrysom/listenerClient_test.go deleted file mode 100644 index f469dc06..00000000 --- a/chrysom/listenerClient_test.go +++ /dev/null @@ -1,203 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/http/httptest" - "strconv" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/xmidt-org/sallust" -) - -var ( - mockListener = ListenerFunc((func(_ Items) { - fmt.Println("Doing amazing work for 100ms") - time.Sleep(time.Millisecond * 100) - })) - mockMeasures = &Measures{ - Polls: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "testPollsCounter", - Help: "testPollsCounter", - }, - []string{OutcomeLabel}, - )} - happyListenerClientConfig = ListenerClientConfig{ - Listener: mockListener, - PullInterval: time.Second, - Logger: sallust.Default(), - } -) - -func TestListenerStartStopPairsParallel(t *testing.T) { - require := require.New(t) - client, close, err := newStartStopClient(true) - assert.Nil(t, err) - defer close() - - t.Run("ParallelGroup", func(t *testing.T) { - for i := 0; i < 20; i++ { - testNumber := i - t.Run(strconv.Itoa(testNumber), func(t *testing.T) { - t.Parallel() - assert := assert.New(t) - fmt.Printf("%d: Start\n", testNumber) - errStart := client.Start(context.Background()) - if errStart != nil { - assert.Equal(ErrListenerNotStopped, errStart) - } - time.Sleep(time.Millisecond * 400) - errStop := client.Stop(context.Background()) - if errStop != nil { - assert.Equal(ErrListenerNotRunning, errStop) - } - fmt.Printf("%d: Done\n", testNumber) - }) - } - }) - - require.Equal(stopped, client.observer.state) -} - -func TestListenerStartStopPairsSerial(t *testing.T) { - require := require.New(t) - client, close, err := newStartStopClient(true) - assert.Nil(t, err) - defer close() - - for i := 0; i < 5; i++ { - testNumber := i - t.Run(strconv.Itoa(testNumber), func(t *testing.T) { - assert := assert.New(t) - fmt.Printf("%d: Start\n", testNumber) - assert.Nil(client.Start(context.Background())) - assert.Nil(client.Stop(context.Background())) - fmt.Printf("%d: Done\n", testNumber) - }) - } - require.Equal(stopped, client.observer.state) -} - -func TestListenerEdgeCases(t *testing.T) { - t.Run("NoListener", func(t *testing.T) { - _, _, err := newStartStopClient(false) - assert.Equal(t, ErrNoListenerProvided, err) - }) - - t.Run("NilTicker", func(t *testing.T) { - assert := assert.New(t) - client, stopServer, err := newStartStopClient(true) - assert.Nil(err) - defer stopServer() - client.observer.ticker = nil - assert.Equal(ErrUndefinedIntervalTicker, client.Start(context.Background())) - }) -} - -func newStartStopClient(includeListener bool) (*ListenerClient, func(), error) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - rw.Write(getItemsValidPayload()) - })) - - config := ListenerClientConfig{ - PullInterval: time.Millisecond * 200, - Logger: sallust.Default(), - } - if includeListener { - config.Listener = mockListener - } - client, err := NewListenerClient(config, nil, mockMeasures, &BasicClient{}) - if err != nil { - return nil, nil, err - } - - return client, server.Close, nil -} - -func TestValidateListenerConfig(t *testing.T) { - tcs := []struct { - desc string - expectedErr error - config ListenerClientConfig - }{ - { - desc: "Happy case Success", - config: happyListenerClientConfig, - }, - { - desc: "No listener Failure", - config: ListenerClientConfig{}, - expectedErr: ErrNoListenerProvided, - }, - { - desc: "No logger and no pull interval Success", - config: ListenerClientConfig{ - Listener: mockListener, - }, - }, - } - for _, tc := range tcs { - t.Run(tc.desc, func(t *testing.T) { - assert := assert.New(t) - c := tc.config - err := validateListenerConfig(&c) - assert.True(errors.Is(err, tc.expectedErr), - fmt.Errorf("error [%v] doesn't contain error [%v] in its err chain", - err, tc.expectedErr), - ) - }) - } -} - -func TestNewListenerClient(t *testing.T) { - tcs := []struct { - desc string - config ListenerClientConfig - expectedErr error - measures *Measures - reader Reader - }{ - { - desc: "Listener Config Failure", - config: ListenerClientConfig{}, - expectedErr: ErrNoListenerProvided, - }, - { - desc: "No measures Failure", - config: happyListenerClientConfig, - expectedErr: ErrNilMeasures, - }, - { - desc: "No reader Failure", - config: happyListenerClientConfig, - measures: mockMeasures, - expectedErr: ErrNoReaderProvided, - }, - { - desc: "Happy case Success", - config: happyListenerClientConfig, - measures: mockMeasures, - reader: &BasicClient{}, - }, - } - for _, tc := range tcs { - t.Run(tc.desc, func(t *testing.T) { - assert := assert.New(t) - _, err := NewListenerClient(tc.config, nil, tc.measures, tc.reader) - assert.True(errors.Is(err, tc.expectedErr), - fmt.Errorf("error [%v] doesn't contain error [%v] in its err chain", - err, tc.expectedErr), - ) - }) - } -} diff --git a/chrysom/metrics.go b/chrysom/metrics.go deleted file mode 100644 index 14da104c..00000000 --- a/chrysom/metrics.go +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/xmidt-org/touchstone" - "go.uber.org/fx" -) - -// Names -const ( - PollCounter = "chrysom_polls_total" -) - -// Labels -const ( - OutcomeLabel = "outcome" -) - -// Label Values -const ( - SuccessOutcome = "success" - FailureOutcome = "failure" -) - -// Metrics returns the Metrics relevant to this package -func ProvideMetrics() fx.Option { - return fx.Options( - touchstone.CounterVec( - prometheus.CounterOpts{ - Name: PollCounter, - Help: "Counter for the number of polls (and their success/failure outcomes) to fetch new items.", - }, - OutcomeLabel, - ), - ) -} - -type Measures struct { - fx.In - Polls *prometheus.CounterVec `name:"chrysom_polls_total"` -} diff --git a/chrysom/pushResult.go b/chrysom/pushResult.go deleted file mode 100644 index 653eab7e..00000000 --- a/chrysom/pushResult.go +++ /dev/null @@ -1,28 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -// PushResult is a simple type to indicate the result type for the -// PushItem operation. -type PushResult int64 - -// Types of pushItem successful results. -const ( - UnknownPushResult PushResult = iota - CreatedPushResult - UpdatedPushResult - NilPushResult -) - -func (p PushResult) String() string { - switch p { - case NilPushResult: - return "" - case CreatedPushResult: - return "created" - case UpdatedPushResult: - return "ok" - } - return "unknown" -} diff --git a/chrysom/store.go b/chrysom/store.go deleted file mode 100644 index 28ef771f..00000000 --- a/chrysom/store.go +++ /dev/null @@ -1,47 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package chrysom - -import ( - "context" - - "github.com/xmidt-org/argus/model" -) - -type PushReader interface { - Pusher - Reader -} - -type Pusher interface { - // PushItem adds the item and establishes its link to the given owner in the store. - PushItem(ctx context.Context, owner string, item model.Item) (PushResult, error) - - // Remove will remove the matching item from the store and return it. - RemoveItem(ctx context.Context, id, owner string) (model.Item, error) -} - -type Listener interface { - // Update is called when we get changes to our item listeners with either - // additions, or updates. - // - // The list of hooks must contain only the current items. - Update(items Items) -} - -type ListenerFunc func(items Items) - -func (l ListenerFunc) Update(items Items) { - l(items) -} - -type Reader interface { - // GeItems returns all the items that belong to this owner. - GetItems(ctx context.Context, owner string) (Items, error) -} - -type ConfigureListener interface { - // SetListener will attempt to set the lister. - SetListener(listener Listener) error -}