Skip to content

Commit

Permalink
cli: push tsdump upload failed metrics to datadog logs
Browse files Browse the repository at this point in the history
Previously, we were displaying failed tsdump upload metrics on CLI output. This
was inadequate because CLI output might get truncated in case of high number of
metric failure. To address this, this patch ships failed metrics to datadog as
part of logs so that we can see all failed uploaded metrics as part of logs.

Part of: CRDB-44835
Epic: None
Release note: None
  • Loading branch information
aa-joshi committed Dec 12, 2024
1 parent 5b9cad9 commit 160db6f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
52 changes: 46 additions & 6 deletions pkg/cli/tsdump_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ var (
targetURLFormat = "https://api.%s/api/v2/series"
datadogDashboardURLFormat = "https://us5.datadoghq.com/dashboard/bif-kwe-gx2/self-hosted-db-console-tsdump?" +
"tpl_var_cluster=%s&tpl_var_upload_id=%s&tpl_var_upload_day=%d&tpl_var_upload_month=%d&tpl_var_upload_year=%d&from_ts=%d&to_ts=%d"
zipFileSignature = []byte{0x50, 0x4B, 0x03, 0x04}
zipFileSignature = []byte{0x50, 0x4B, 0x03, 0x04}
logMessageFormat = "tsdump upload to datadog is partially failed for metric: %s"
partialFailureMessageFormat = "The Tsdump upload to Datadog succeeded but %d metrics partially failed to upload." +
" These failures can be due to transietnt network errors. If any of these metrics are critical for your investigation," +
" please re-upload the Tsdump:\n%s\n"
datadogLogsURLFormat = "https://us5.datadoghq.com/logs?query=cluster_label:%s+upload_id:%s"
)

// DatadogPoint is a single metric point in Datadog format
Expand Down Expand Up @@ -318,6 +323,7 @@ func (d *datadogWriter) flush(data []DatadogSeries) error {
}
}
return err

}

func (d *datadogWriter) upload(fileName string) error {
Expand Down Expand Up @@ -421,10 +427,12 @@ func (d *datadogWriter) upload(fileName string) error {
fmt.Printf("\nUpload status: %s!\n", uploadStatus)

if metricsUploadState.isSingleUploadSucceeded {
var isDatadogUploadFailed = false
markDatadogUploadFailedOnce := sync.OnceFunc(func() {
isDatadogUploadFailed = true
})
if len(metricsUploadState.uploadFailedMetrics) != 0 {
fmt.Printf("The Tsdump upload to Datadog succeeded but %d metrics partially failed to upload."+
" These failures can be due to transietnt network errors. If any of these metrics are critical for your investigation,"+
" please re-upload the Tsdump:\n%s\n", len(metricsUploadState.uploadFailedMetrics), strings.Join(func() []string {
fmt.Printf(partialFailureMessageFormat, len(metricsUploadState.uploadFailedMetrics), strings.Join(func() []string {
var failedMetricsList []string
index := 1
for metric := range metricsUploadState.uploadFailedMetrics {
Expand All @@ -434,9 +442,41 @@ func (d *datadogWriter) upload(fileName string) error {
}
return failedMetricsList
}(), "\n"))
}

fmt.Println("\nupload id: ", d.uploadID)
tags := strings.Join(getUploadTags(d), ",")
fmt.Println("\nPushing logs of metric upload failures to datadog...")
for metric := range metricsUploadState.uploadFailedMetrics {
wg.Add(1)
go func(metric string) {
logMessage := fmt.Sprintf(logMessageFormat, metric)

logEntryJSON, _ := json.Marshal(struct {
Message any `json:"message,omitempty"`
Tags string `json:"ddtags,omitempty"`
Source string `json:"ddsource,omitempty"`
}{
Message: logMessage,
Tags: tags,
Source: "tsdump_upload",
})

_, err := uploadLogsToDatadog(logEntryJSON, d.apiKey, debugTimeSeriesDumpOpts.ddSite)
if err != nil {
markDatadogUploadFailedOnce()
}
wg.Done()
}(metric)
}

wg.Wait()
if isDatadogUploadFailed {
fmt.Println("Failed to pushed some metrics to datadog logs. Please refer CLI output for all failed metrics.")
} else {
fmt.Println("Pushing logs of metric upload failures to datadog...done")
fmt.Printf("datadog logs for metric upload failures link: %s\n", fmt.Sprintf(datadogLogsURLFormat, debugTimeSeriesDumpOpts.clusterLabel, d.uploadID))
}
}
fmt.Println("\nupload id:", d.uploadID)
fmt.Printf("datadog dashboard link: %s\n", dashboardLink)
} else {
fmt.Println("All metric upload is failed. Please re-upload the Tsdump.")
Expand Down
16 changes: 8 additions & 8 deletions pkg/cli/zip_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func newProfileUploadReq(
return nil, err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, makeDDURL(datadogProfileUploadURLTmpl), &body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, makeDDURL(datadogProfileUploadURLTmpl, debugZipUploadOpts.ddSite), &body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -655,7 +655,7 @@ func setupDDArchive(ctx context.Context, pathPrefix, archiveName string) error {
}

req, err := http.NewRequestWithContext(
ctx, http.MethodPost, makeDDURL(datadogCreateArchiveURLTmpl), bytes.NewReader(rawPayload),
ctx, http.MethodPost, makeDDURL(datadogCreateArchiveURLTmpl, debugZipUploadOpts.ddSite), bytes.NewReader(rawPayload),
)
if err != nil {
return err
Expand Down Expand Up @@ -798,18 +798,18 @@ func ddLogUpload(ctx context.Context, sig logUploadSig) (int, error) {
buf.Write(bytes.Join(sig.logLines, []byte(",")))
buf.WriteByte(']')

return uploadLogsToDatadog(ctx, buf.Bytes())
return uploadLogsToDatadog(buf.Bytes(), debugZipUploadOpts.ddAPIKey, debugZipUploadOpts.ddSite)
}

// uploadLogsToDatadog is a generic function that uploads the given payload of
// logs to datadog. This exists because artifacts other than logs might also
// need to be uploaded to datadog in the form of logs (example: table dumps,
// events etc.).
func uploadLogsToDatadog(ctx context.Context, payload []byte) (int, error) {
func uploadLogsToDatadog(payload []byte, ddApiKey string, ddSite string) (int, error) {
var (
compressedLogs bytes.Buffer
compressedlogWriter = gzip.NewWriter(&compressedLogs)
url = makeDDURL(datadogLogIntakeURLTmpl)
url = makeDDURL(datadogLogIntakeURLTmpl, ddSite)
)

if _, err := compressedlogWriter.Write(payload); err != nil {
Expand All @@ -832,7 +832,7 @@ func uploadLogsToDatadog(ctx context.Context, payload []byte) (int, error) {

req.Header.Set(httputil.ContentTypeHeader, httputil.JSONContentType)
req.Header.Set(httputil.ContentEncodingHeader, httputil.GzipEncoding)
req.Header.Set(datadogAPIKeyHeader, debugZipUploadOpts.ddAPIKey)
req.Header.Set(datadogAPIKeyHeader, ddApiKey)

if _, err = doUploadReq(req); err == nil {
break
Expand Down Expand Up @@ -1077,8 +1077,8 @@ You will receive an email notification once the rehydration is complete.
// placeholder in the template. This is a simple convenience
// function. It assumes that the site is valid. This assumption is
// fine because we are validating the site early on in the flow.
func makeDDURL(tmpl string) string {
return fmt.Sprintf(tmpl, ddSiteToHostMap[debugZipUploadOpts.ddSite])
func makeDDURL(tmpl string, ddSite string) string {
return fmt.Sprintf(tmpl, ddSiteToHostMap[ddSite])
}

// humanReadableSize converts the given number of bytes to a human readable
Expand Down

0 comments on commit 160db6f

Please sign in to comment.