From a91822511a0515ee2aee6d4abdeabaaf09fb9485 Mon Sep 17 00:00:00 2001 From: Keyur Date: Fri, 19 Jun 2020 18:07:25 -0400 Subject: [PATCH] Refactor Moesif pump --- README.md | 15 ++- go.mod | 2 +- go.sum | 4 + pumps/moesif.go | 320 ++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 309 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index e48a43396..b7ae8c6e9 100644 --- a/README.md +++ b/README.md @@ -317,10 +317,17 @@ The Tyk Dashboard uses the "mongo-pump-aggregate" collection to display analytic * `bulk_size`: Specifies the size (in bytes) needed to flush the data and send it to ES. Defaults to 5MB. If it is needed, can be disabled with -1. ### Moesif Config -[Moesif](https://www.moesif.com) is a logging and analytics service for APIs. The Moesif pump will -move analytics data from Tyk to Moesif. - -`"application_id"` - Moesif App Id JWT. Multiple api_id's will go under the same app id. +[Moesif](https://www.moesif.com/?language=tyk-api-gateway) is a user-centric API analytics and monitoring service for APIs. [More Info on Moesif for Tyk](https://www.moesif.com/solutions/track-api-program?language=tyk-api-gateway) + +- `"application_id"` - Moesif App Id JWT. Multiple api_id's will go under the same app id. +- `"request_header_masks"` - (optional) An option to mask a specific request header field. Type: String Array `[] string` +- `"request_body_masks"` - (optional) An option to mask a specific - request body field. Type: String Array `[] string` +- `"response_header_masks"` - (optional) An option to mask a specific response header field. Type: String Array `[] string` +- `"response_body_masks"` - (optional) An option to mask a specific response body field. Type: String Array `[] string` +- `"disable_capture_request_body"` - (optional) An option to disable logging of request body. Type: Boolean. Default value is `false`. +- `"disable_capture_response_body"` - (optional) An option to disable logging of response body. Type: Boolean. Default value is `false`. +- `"user_id_header"` - (optional) An optional field name to identify User from a request or response header. Type: String. +- `"company_id_header"` - (optional) An optional field name to identify Company (Account) from a request or response header. Type: String. ### Prometheus Prometheus is an open-source monitoring system with a dimensional data model, flexible query language, efficient time series database and modern alerting approach. diff --git a/go.mod b/go.mod index 410eb43f5..489365c6d 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/logzio/logzio-go v0.0.0-20190331100143-1138f714b3b6 github.com/lonelycode/mgohacks v0.0.0-20150820024025-f9c291f7e57e github.com/mitchellh/mapstructure v1.1.2 - github.com/moesif/moesifapi-go v0.0.0-20170216233325-69242ec5159a + github.com/moesif/moesifapi-go v1.0.5 github.com/olivere/elastic v6.2.31+incompatible // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v0.9.1 diff --git a/go.sum b/go.sum index b5af68ea6..efa6beb5f 100644 --- a/go.sum +++ b/go.sum @@ -301,6 +301,8 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/moesif/moesifapi-go v0.0.0-20170216233325-69242ec5159a h1:/PkqjUozGqWVAZZmT8rFmmI6JlMrSxUbtl8YZvqy+HM= github.com/moesif/moesifapi-go v0.0.0-20170216233325-69242ec5159a/go.mod h1:HGxp2XZwvI6M063YPSybdTrNYNUTYiJ8W+0sgl+6o2c= +github.com/moesif/moesifapi-go v1.0.5 h1:SQAQvzAD8Qbvjj2ZFhIw1ERQWQ/F4njcXQ32s81Nv9E= +github.com/moesif/moesifapi-go v1.0.5/go.mod h1:qlUk62wTdzP5a0NNoSsSUSwdYRSOcf+gyxv+ffZ/8eQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/newrelic/go-agent v2.13.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -485,6 +487,8 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM= +golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/pumps/moesif.go b/pumps/moesif.go index 5a4ef3d77..6a7c99273 100644 --- a/pumps/moesif.go +++ b/pumps/moesif.go @@ -4,25 +4,37 @@ import ( "bufio" "context" "encoding/base64" + "encoding/json" "fmt" - "strings" - "time" - + "github.com/TykTechnologies/logrus" + "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" "github.com/moesif/moesifapi-go" "github.com/moesif/moesifapi-go/models" - - "github.com/TykTechnologies/logrus" - "github.com/TykTechnologies/tyk-pump/analytics" + "io/ioutil" + "math" + "math/rand" + "net/http" + "strconv" + "strings" + "time" ) type MoesifPump struct { - moesifApi moesifapi.API - moesifConf *MoesifConf + moesifAPI moesifapi.API + moesifConf *MoesifConf + filters analytics.AnalyticsFilters + timeout int + samplingPercentage int + eTag string + lastUpdatedTime time.Time + appConfig map[string]interface{} + userSampleRateMap map[string]interface{} + companySampleRateMap map[string]interface{} CommonPumpConfig } -type RawDecoded struct { +type rawDecoded struct { headers map[string]interface{} body interface{} } @@ -30,10 +42,18 @@ type RawDecoded struct { var moesifPrefix = "moesif-pump" type MoesifConf struct { - ApplicationId string `mapstructure:"application_id"` + ApplicationID string `mapstructure:"application_id"` + RequestHeaderMasks []string `mapstructure:"request_header_masks"` + ResponseHeaderMasks []string `mapstructure:"response_header_masks"` + RequestBodyMasks []string `mapstructure:"request_body_masks"` + ResponseBodyMasks []string `mapstructure:"response_body_masks"` + DisableCaptureRequestBody bool `mapstructure:"disable_capture_request_body"` + DisableCaptureResponseBody bool `mapstructure:"disable_capture_response_body"` + UserIDHeader string `mapstructure:"user_id_header"` + CompanyIDHeader string `mapstructure:"company_id_header"` } -func (e *MoesifPump) New() Pump { +func (p *MoesifPump) New() Pump { newPump := MoesifPump{} return &newPump } @@ -42,6 +62,143 @@ func (p *MoesifPump) GetName() string { return "Moesif Pump" } +func (p *MoesifPump) parseConfiguration(response *http.Response) (int, string, time.Time) { + // Get X-Moesif-Config-Etag header from response + if configETag, ok := response.Header["X-Moesif-Config-Etag"]; ok { + p.eTag = configETag[0] + } + + // Read the response body + respBody, err := ioutil.ReadAll(response.Body) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": moesifPrefix, + }).Fatal("Couldn't parse configuration: ", err) + return p.samplingPercentage, p.eTag, time.Now().UTC() + } + // Parse the response Body + if jsonRespParseErr := json.Unmarshal(respBody, &p.appConfig); jsonRespParseErr == nil { + // Fetch sample rate from appConfig + if getSampleRate, found := p.appConfig["sample_rate"]; found { + if rate, ok := getSampleRate.(float64); ok { + p.samplingPercentage = int(rate) + } + } + // Fetch User Sample rate from appConfig + if userSampleRate, ok := p.appConfig["user_sample_rate"]; ok { + if userRates, ok := userSampleRate.(map[string]interface{}); ok { + p.userSampleRateMap = userRates + } + } + // Fetch Company Sample rate from appConfig + if companySampleRate, ok := p.appConfig["company_sample_rate"]; ok { + if companyRates, ok := companySampleRate.(map[string]interface{}); ok { + p.companySampleRateMap = companyRates + } + } + } + + return p.samplingPercentage, p.eTag, time.Now().UTC() +} + +func (p *MoesifPump) getSamplingPercentage(userID string, companyID string) int { + if userID != "" { + if userRate, ok := p.userSampleRateMap[userID].(float64); ok { + return int(userRate) + } + } + + if companyID != "" { + if companyRate, ok := p.companySampleRateMap[companyID].(float64); ok { + return int(companyRate) + } + } + + if getSampleRate, found := p.appConfig["sample_rate"]; found { + if rate, ok := getSampleRate.(float64); ok { + return int(rate) + } + } + + return 100 +} + +func fetchIDFromHeader(requestHeaders map[string]interface{}, responseHeaders map[string]interface{}, headerName string) string { + var id string + if requid, ok := requestHeaders[headerName].(string); ok { + id = requid + } + if resuid, ok := responseHeaders[headerName].(string); ok { + id = resuid + } + return id +} + +func toLowerCase(headers map[string]interface{}) map[string]interface{} { + transformMap := make(map[string]interface{}, len(headers)) + for k, v := range headers { + transformMap[strings.ToLower(k)] = v + } + return transformMap +} + +func contains(arr []string, str string) bool { + for _, value := range arr { + if value == str { + return true + } + } + return false +} + +func maskData(data map[string]interface{}, maskBody []string) map[string]interface{} { + for key, val := range data { + switch val.(type) { + case map[string]interface{}: + if contains(maskBody, key) { + data[key] = "*****" + } else { + maskData(val.(map[string]interface{}), maskBody) + } + default: + if contains(maskBody, key) { + data[key] = "*****" + } + } + } + return data +} + +func maskRawBody(rawBody string, maskBody []string) string { + // Mask body + var maskedBody map[string]interface{} + if err := json.Unmarshal([]byte(rawBody), &maskedBody); err == nil { + + if len(maskBody) > 0 { + maskedBody = maskData(maskedBody, maskBody) + } + + out, _ := json.Marshal(maskedBody) + return base64.StdEncoding.EncodeToString([]byte(out)) + } + + return base64.StdEncoding.EncodeToString([]byte(rawBody)) +} + +func buildURI(raw string, defaultPath string) string { + pathHeadersBody := strings.SplitN(raw, "\r\n", 2) + + if len(pathHeadersBody) >= 2 { + requestPath := strings.Fields(pathHeadersBody[0]) + if len(requestPath) >= 3 { + url := requestPath[1] + return url + } + return defaultPath + } + return defaultPath +} + func (p *MoesifPump) Init(config interface{}) error { p.moesifConf = &MoesifConf{} loadConfigErr := mapstructure.Decode(config, &p.moesifConf) @@ -52,8 +209,23 @@ func (p *MoesifPump) Init(config interface{}) error { }).Fatal("Failed to decode configuration: ", loadConfigErr) } - api := moesifapi.NewAPI(p.moesifConf.ApplicationId) - p.moesifApi = api + api := moesifapi.NewAPI(p.moesifConf.ApplicationID) + p.moesifAPI = api + + // Default samplingPercentage and DateTime + p.samplingPercentage = 100 + p.lastUpdatedTime = time.Now().UTC() + + // Fetch application config + response, err := p.moesifAPI.GetAppConfig() + + if err == nil { + p.samplingPercentage, p.eTag, p.lastUpdatedTime = p.parseConfiguration(response) + } else { + log.WithFields(logrus.Fields{ + "prefix": moesifPrefix, + }).Debug("Error fetching application configuration on initilization with err - " + err.Error()) + } return nil } @@ -78,7 +250,8 @@ func (p *MoesifPump) WriteData(ctx context.Context, data []interface{}) error { }).Fatal(err) } - decodedReqBody, err := decodeRawData(string(rawReq)) + decodedReqBody, err := decodeRawData(string(rawReq), p.moesifConf.RequestHeaderMasks, + p.moesifConf.RequestBodyMasks, p.moesifConf.DisableCaptureRequestBody) if err != nil { log.WithFields(logrus.Fields{ @@ -86,9 +259,15 @@ func (p *MoesifPump) WriteData(ctx context.Context, data []interface{}) error { }).Fatal(err) } + // Request URL + requestURL := buildURI(string(rawReq), record.Path) + + // Request Time + reqTime := record.TimeStamp.UTC() + req := models.EventRequestModel{ - Time: &record.TimeStamp, - Uri: record.Path, + Time: &reqTime, + Uri: requestURL, Verb: record.Method, ApiVersion: &record.APIVersion, IpAddress: &record.IPAddress, @@ -105,7 +284,8 @@ func (p *MoesifPump) WriteData(ctx context.Context, data []interface{}) error { }).Fatal(err) } - decodedRspBody, err := decodeRawData(string(rawRsp)) + decodedRspBody, err := decodeRawData(string(rawRsp), p.moesifConf.ResponseHeaderMasks, + p.moesifConf.ResponseBodyMasks, p.moesifConf.DisableCaptureResponseBody) if err != nil { log.WithFields(logrus.Fields{ @@ -113,7 +293,8 @@ func (p *MoesifPump) WriteData(ctx context.Context, data []interface{}) error { }).Fatal(err) } - rspTime := record.TimeStamp.Add(time.Duration(record.RequestTime) * time.Millisecond) + // Response Time + rspTime := record.TimeStamp.Add(time.Duration(record.RequestTime) * time.Millisecond).UTC() rsp := models.EventResponseModel{ Time: &rspTime, @@ -124,40 +305,112 @@ func (p *MoesifPump) WriteData(ctx context.Context, data []interface{}) error { TransferEncoding: &transferEncoding, } + // Add Metadata + metadata := map[string]interface{}{ + "tyk": map[string]interface{}{ + "api_name": record.APIName, + "tags": record.Tags, + }, + } + + // Direction to the event + direction := "Incoming" + + // User Id + var userID string + if p.moesifConf.UserIDHeader != "" { + userID = fetchIDFromHeader(decodedReqBody.headers, decodedRspBody.headers, p.moesifConf.UserIDHeader) + } + + if userID == "" { + if record.Alias != "" { + userID = record.Alias + } else if record.OauthID != "" { + userID = record.OauthID + } + } + + // Company Id + var companyID string + if p.moesifConf.CompanyIDHeader != "" { + companyID = fetchIDFromHeader(decodedReqBody.headers, decodedRspBody.headers, p.moesifConf.CompanyIDHeader) + } + + // Generate random percentage + rand.Seed(time.Now().UnixNano()) + randomPercentage := rand.Intn(100) + + // Parse sampling percentage based on user/company + p.samplingPercentage = p.getSamplingPercentage(userID, companyID) + + if p.samplingPercentage < randomPercentage { + log.WithFields(logrus.Fields{ + "prefix": moesifPrefix, + }).Debug("Skipped Event due to sampling percentage: " + strconv.Itoa(p.samplingPercentage) + " and random percentage: " + strconv.Itoa(randomPercentage)) + continue + } + // Add Weight to the Event Model + var eventWeight int + if p.samplingPercentage == 0 { + eventWeight = 1 + } else { + eventWeight = int(math.Floor(float64(100 / p.samplingPercentage))) + } + event := models.EventModel{ Request: req, Response: rsp, SessionToken: &record.APIKey, Tags: nil, - UserId: nil, + UserId: &userID, + CompanyId: &companyID, + Metadata: &metadata, + Direction: &direction, + Weight: &eventWeight, } - err = p.moesifApi.QueueEvent(&event) + err = p.moesifAPI.QueueEvent(&event) if err != nil { log.WithFields(logrus.Fields{ "prefix": moesifPrefix, }).Error("Error while writing ", data[dataIndex], err) } + + if p.moesifAPI.GetETag() != "" && + p.eTag != "" && + p.eTag != p.moesifAPI.GetETag() && + time.Now().UTC().After(p.lastUpdatedTime.Add(time.Minute*1)) { + + // Call Endpoint to fetch config + response, err := p.moesifAPI.GetAppConfig() + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": moesifPrefix, + }).Debug("Error fetching application configuration with err - " + err.Error()) + continue + } + p.samplingPercentage, p.eTag, p.lastUpdatedTime = p.parseConfiguration(response) + } } return nil } -func decodeRawData(raw string) (*RawDecoded, error) { +func decodeRawData(raw string, maskHeaders []string, maskBody []string, disableCaptureBody bool) (*rawDecoded, error) { headersBody := strings.SplitN(raw, "\r\n\r\n", 2) if len(headersBody) == 0 { return nil, fmt.Errorf("Error while splitting raw data") } - headers := decodeHeaders(headersBody[0]) + headers := decodeHeaders(headersBody[0], maskHeaders) var body interface{} - if len(headersBody) == 2 { - body = base64.StdEncoding.EncodeToString([]byte(headersBody[1])) + if len(headersBody) == 2 && !disableCaptureBody { + body = maskRawBody(headersBody[1], maskBody) } - ret := &RawDecoded{ + ret := &rawDecoded{ headers: headers, body: body, } @@ -165,8 +418,7 @@ func decodeRawData(raw string) (*RawDecoded, error) { return ret, nil } -func decodeHeaders(headers string) map[string]interface{} { - +func decodeHeaders(headers string, maskHeaders []string) map[string]interface{} { scanner := bufio.NewScanner(strings.NewReader(headers)) ret := make(map[string]interface{}, strings.Count(headers, "\r\n")) @@ -183,5 +435,19 @@ func decodeHeaders(headers string) map[string]interface{} { ret[kv[0]] = strings.TrimSpace(kv[1]) } + // Mask Headers + ret = maskData(ret, maskHeaders) + + // Transform Map to lowercase + ret = toLowerCase(ret) + return ret } + +func (p *MoesifPump) SetTimeout(timeout int) { + p.timeout = timeout +} + +func (p *MoesifPump) GetTimeout() int { + return p.timeout +}