Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add alert classifier #2494

Merged
merged 12 commits into from
Jan 30, 2025
16 changes: 10 additions & 6 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,9 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
return
}

errorClassString := ""

var tags []string
if errors.Is(err, context.Canceled) {
tags = append(tags, string(shared.ErrTypeCanceled))
errorClassString = "context.Canceled"
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
tags = append(tags, string(shared.ErrTypeEOF))
Expand All @@ -477,15 +474,22 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
tags = append(tags, string(shared.ErrTypeNet))
}

a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...)
errorClass := GetErrorClass(ctx, err)
tags = append(tags, "errorClass:"+errorClass.String(), "errorAction:"+errorClass.ErrorAction().String())

if !peerdbenv.PeerDBTelemetryErrorActionBasedAlertingEnabled() || errorClass.ErrorAction() == NotifyTelemetry {
a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...)
}
if a.otelManager != nil {
a.otelManager.Metrics.ErrorsEmittedCounter.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, flowName),
attribute.String(otel_metrics.ErrorClassKey, errorClassString),
attribute.String(otel_metrics.ErrorClassKey, errorClass.String()),
attribute.String(otel_metrics.ErrorActionKey, errorClass.ErrorAction().String()),
)))
a.otelManager.Metrics.ErrorEmittedGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, flowName),
attribute.String(otel_metrics.ErrorClassKey, errorClassString),
attribute.String(otel_metrics.ErrorClassKey, errorClass.String()),
attribute.String(otel_metrics.ErrorActionKey, errorClass.ErrorAction().String()),
)))
}
}
Expand Down
168 changes: 168 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package alerting

import (
"context"
"errors"
"io"
"net"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/jackc/pgx/v5/pgconn"
"golang.org/x/crypto/ssh"

"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
)

type ErrorAction string

const (
NotifyUser ErrorAction = "notify_user"
Ignore ErrorAction = "ignore"
NotifyTelemetry ErrorAction = "notify_telemetry"
)

func (e ErrorAction) String() string {
return string(e)
}

type ErrorClass struct {
Class string
action ErrorAction
}

var (
ErrorNotifyOOM = ErrorClass{
// ClickHouse Code 241
Class: "NOTIFY_OOM", action: NotifyUser,
}
ErrorNotifyMVOrView = ErrorClass{
// ClickHouse Code 349 / Code 48 with "while pushing to view"
Class: "NOTIFY_MV_OR_VIEW", action: NotifyUser,
}
ErrorNotifyConnectivity = ErrorClass{
// ClickHouse Code 81 or Postgres Code 28P01
Class: "NOTIFY_CONNECTIVITY", action: NotifyUser,
}
ErrorNotifySlotInvalid = ErrorClass{
// Postgres Code 55000 with "cannot read from logical replication slot"
Class: "NOTIFY_SLOT_INVALID", action: NotifyUser,
}
ErrorNotifyTerminate = ErrorClass{
// Postgres Code 57P01
Class: "NOTIFY_TERMINATE", action: NotifyUser,
}
ErrorNotifyConnectTimeout = ErrorClass{
// TODO(this is mostly done via NOTIFY_CONNECTIVITY, will remove later if not needed)
Class: "NOTIFY_CONNECT_TIMEOUT", action: NotifyUser,
}
ErrorEventInternal = ErrorClass{
// Level <= Info
Class: "EVENT_INTERNAL", action: NotifyTelemetry,
}
ErrorIgnoreEOF = ErrorClass{
// io.EOF || io.ErrUnexpectedEOF
Class: "IGNORE_EOF", action: Ignore,
}
ErrorIgnoreContextCancelled = ErrorClass{
// context.Canceled
Class: "IGNORE_CONTEXT_CANCELLED", action: Ignore,
}
ErrorInternalClickHouse = ErrorClass{
// Code 999 or 341
Class: "INTERNAL_CLICKHOUSE", action: NotifyTelemetry,
}
ErrorOther = ErrorClass{
// These are internal and should not be exposed
Class: "OTHER", action: NotifyTelemetry,
}
)

func (e ErrorClass) String() string {
return e.Class
}

func (e ErrorClass) ErrorAction() ErrorAction {
if e.action != "" {
return e.action
}
return NotifyTelemetry
}

func GetErrorClass(ctx context.Context, err error) ErrorClass {
Copy link
Contributor

@serprex serprex Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetErrorClass(ctx context.Context, err error) ErrorClass {
func GetErrorClass(ctx context.Context, err error) (ErrorClass, []string) {

this might as well compute tags, lots of overlap. Appending logic currently will cause duplicate tagging. Logic a bit annoying with early returns here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appending logic currently will cause duplicate tagging

Duplicate tags are fine, will do the same in phases, will have a different prefix for classification and action

Logic a bit annoying with early returns here

Did it to prevent going through all if cases, can remove the return from switch and move after the switches, IIRC the switches use binary search

// PeerDB error types
var peerDBErr *exceptions.PostgresSetupError
if errors.As(err, &peerDBErr) {
return ErrorNotifyConnectivity
}
// Generally happens during workflow cancellation
if errors.Is(err, context.Canceled) {
return ErrorIgnoreContextCancelled
}
// Usually seen in ClickHouse cloud during instance scale-up
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return ErrorIgnoreEOF
}
// ClickHouse specific errors
var exception *clickhouse.Exception
if errors.As(err, &exception) {
switch exception.Code {
case 241: // MEMORY_LIMIT_EXCEEDED
if isClickHouseMvError(exception) {
return ErrorNotifyMVOrView
}
return ErrorNotifyOOM
case 349: // CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN
if isClickHouseMvError(exception) {
return ErrorNotifyMVOrView
}
case 48: // NOT_IMPLEMENTED
if isClickHouseMvError(exception) {
return ErrorNotifyMVOrView
}
case 81: // UNKNOWN_DATABASE
return ErrorNotifyConnectivity
case 999: // KEEPER_EXCEPTION
return ErrorInternalClickHouse
case 341: // UNFINISHED
return ErrorInternalClickHouse
case 236: // ABORTED
return ErrorInternalClickHouse
}
}
// Postgres specific errors
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "28P01": // invalid_password
return ErrorNotifyConnectivity
case "42P01": // undefined_table
return ErrorNotifyConnectivity
case "57P01": // admin_shutdown
return ErrorNotifyTerminate
case "57P03": // cannot_connect_now
return ErrorNotifyConnectivity
case "55000": // object_not_in_prerequisite_state
if strings.Contains(pgErr.Message, "cannot read from logical replication slot") {
return ErrorNotifySlotInvalid
}
}
}

// Network related errors
var netErr *net.OpError
if errors.As(err, &netErr) {
return ErrorNotifyConnectivity
}

// SSH related errors
var sshErr *ssh.OpenChannelError
if errors.As(err, &sshErr) {
return ErrorNotifyConnectivity
}
return ErrorOther
}

func isClickHouseMvError(exception *clickhouse.Exception) bool {
return strings.Contains(exception.Message, "while pushing to view")
serprex marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 3 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
"github.com/PeerDB-io/peerdb/flow/peerdbenv"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
)

type PostgresConnector struct {
Expand Down Expand Up @@ -1334,8 +1335,8 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
}
notPresentTables := shared.ArrayMinus(additionalSrcTables, tableNames)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
return exceptions.NewPostgresSetupError(fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", ")))
}
} else {
for _, additionalSrcTable := range additionalSrcTables {
Expand Down
1 change: 1 addition & 0 deletions flow/otel_metrics/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
FlowNameKey = "flowName"
DeploymentUidKey = "deploymentUID"
ErrorClassKey = "errorClass"
ErrorActionKey = "errorAction"
InstanceStatusKey = "instanceStatus"
WorkflowTypeKey = "workflowType"
BatchIdKey = "batchId"
Expand Down
9 changes: 9 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,12 @@ func PeerDBRAPIRequestLoggingEnabled() bool {
func PeerDBMaintenanceModeWaitAlertSeconds() int {
return getEnvConvert("PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS", 600, strconv.Atoi)
}

func PeerDBTelemetryErrorActionBasedAlertingEnabled() bool {
enabled, err := strconv.ParseBool(GetEnvString("PEERDB_TELEMETRY_ERROR_ACTION_BASED_ALERTING_ENABLED", "false"))
if err != nil {
slog.Error("failed to parse PEERDB_TELEMETRY_ERROR_ACTION_BASED_ALERTING_ENABLED to bool", "error", err)
return false
}
return enabled
}
18 changes: 18 additions & 0 deletions flow/shared/exceptions/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package exceptions

// PostgresSetupError represents errors during setup of Postgres peers, maybe we can later replace with a more generic error type
type PostgresSetupError struct {
Copy link
Contributor

@serprex serprex Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should implement Unwrap() error to work better with various functions from errors tho as is I don't see what this adds over fmt.Errorf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what this adds over fmt.Errorf

This is just for easier check, so we don't have to do string matching, have used it in 1 place and can gradually increase usage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideal would be for there to be interface that has ErrorClass() ErrorClass

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I think we can do that once we have more error types

error
}

func (e *PostgresSetupError) Error() string {
return "Postgres setup error: " + e.error.Error()
}

func (e *PostgresSetupError) Unwrap() error {
return e.error
}

func NewPostgresSetupError(err error) *PostgresSetupError {
return &PostgresSetupError{err}
}
Loading