-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add alert classifier #2494
Changes from all commits
018f40d
b612a8b
64f75a1
58bb2f2
1fabc1c
dff2f57
7a0796f
c90fa12
a0044c9
e3ddf7b
9ca41b3
4deffb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
package alerting | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"io" | ||
"net" | ||
"strings" | ||
|
||
"github.com/ClickHouse/clickhouse-go/v2" | ||
"github.com/jackc/pgx/v5/pgconn" | ||
"golang.org/x/crypto/ssh" | ||
|
||
"github.com/PeerDB-io/peerdb/flow/shared/exceptions" | ||
) | ||
|
||
type ErrorAction string | ||
|
||
const ( | ||
NotifyUser ErrorAction = "notify_user" | ||
Ignore ErrorAction = "ignore" | ||
NotifyTelemetry ErrorAction = "notify_telemetry" | ||
) | ||
|
||
func (e ErrorAction) String() string { | ||
return string(e) | ||
} | ||
|
||
type ErrorClass struct { | ||
Class string | ||
action ErrorAction | ||
} | ||
|
||
var ( | ||
ErrorNotifyOOM = ErrorClass{ | ||
// ClickHouse Code 241 | ||
Class: "NOTIFY_OOM", action: NotifyUser, | ||
} | ||
ErrorNotifyMVOrView = ErrorClass{ | ||
// ClickHouse Code 349 / Code 48 with "while pushing to view" | ||
Class: "NOTIFY_MV_OR_VIEW", action: NotifyUser, | ||
} | ||
ErrorNotifyConnectivity = ErrorClass{ | ||
// ClickHouse Code 81 or Postgres Code 28P01 | ||
Class: "NOTIFY_CONNECTIVITY", action: NotifyUser, | ||
} | ||
ErrorNotifySlotInvalid = ErrorClass{ | ||
// Postgres Code 55000 with "cannot read from logical replication slot" | ||
Class: "NOTIFY_SLOT_INVALID", action: NotifyUser, | ||
} | ||
ErrorNotifyTerminate = ErrorClass{ | ||
// Postgres Code 57P01 | ||
Class: "NOTIFY_TERMINATE", action: NotifyUser, | ||
} | ||
ErrorNotifyConnectTimeout = ErrorClass{ | ||
// TODO(this is mostly done via NOTIFY_CONNECTIVITY, will remove later if not needed) | ||
Class: "NOTIFY_CONNECT_TIMEOUT", action: NotifyUser, | ||
} | ||
ErrorEventInternal = ErrorClass{ | ||
// Level <= Info | ||
Class: "EVENT_INTERNAL", action: NotifyTelemetry, | ||
} | ||
ErrorIgnoreEOF = ErrorClass{ | ||
// io.EOF || io.ErrUnexpectedEOF | ||
Class: "IGNORE_EOF", action: Ignore, | ||
} | ||
ErrorIgnoreContextCancelled = ErrorClass{ | ||
// context.Canceled | ||
Class: "IGNORE_CONTEXT_CANCELLED", action: Ignore, | ||
} | ||
ErrorInternalClickHouse = ErrorClass{ | ||
// Code 999 or 341 | ||
Class: "INTERNAL_CLICKHOUSE", action: NotifyTelemetry, | ||
} | ||
ErrorOther = ErrorClass{ | ||
// These are internal and should not be exposed | ||
Class: "OTHER", action: NotifyTelemetry, | ||
} | ||
) | ||
|
||
func (e ErrorClass) String() string { | ||
return e.Class | ||
} | ||
|
||
func (e ErrorClass) ErrorAction() ErrorAction { | ||
if e.action != "" { | ||
return e.action | ||
} | ||
return NotifyTelemetry | ||
} | ||
|
||
func GetErrorClass(ctx context.Context, err error) ErrorClass { | ||
// PeerDB error types | ||
var peerDBErr *exceptions.PostgresSetupError | ||
if errors.As(err, &peerDBErr) { | ||
return ErrorNotifyConnectivity | ||
} | ||
// Generally happens during workflow cancellation | ||
if errors.Is(err, context.Canceled) { | ||
return ErrorIgnoreContextCancelled | ||
} | ||
// Usually seen in ClickHouse cloud during instance scale-up | ||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { | ||
return ErrorIgnoreEOF | ||
} | ||
// ClickHouse specific errors | ||
var exception *clickhouse.Exception | ||
if errors.As(err, &exception) { | ||
switch exception.Code { | ||
case 241: // MEMORY_LIMIT_EXCEEDED | ||
if isClickHouseMvError(exception) { | ||
return ErrorNotifyMVOrView | ||
} | ||
return ErrorNotifyOOM | ||
case 349: // CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN | ||
if isClickHouseMvError(exception) { | ||
return ErrorNotifyMVOrView | ||
} | ||
case 48: // NOT_IMPLEMENTED | ||
if isClickHouseMvError(exception) { | ||
return ErrorNotifyMVOrView | ||
} | ||
case 81: // UNKNOWN_DATABASE | ||
return ErrorNotifyConnectivity | ||
case 999: // KEEPER_EXCEPTION | ||
return ErrorInternalClickHouse | ||
case 341: // UNFINISHED | ||
return ErrorInternalClickHouse | ||
case 236: // ABORTED | ||
return ErrorInternalClickHouse | ||
} | ||
} | ||
// Postgres specific errors | ||
var pgErr *pgconn.PgError | ||
if errors.As(err, &pgErr) { | ||
switch pgErr.Code { | ||
case "28P01": // invalid_password | ||
return ErrorNotifyConnectivity | ||
case "42P01": // undefined_table | ||
return ErrorNotifyConnectivity | ||
case "57P01": // admin_shutdown | ||
return ErrorNotifyTerminate | ||
case "57P03": // cannot_connect_now | ||
return ErrorNotifyConnectivity | ||
case "55000": // object_not_in_prerequisite_state | ||
if strings.Contains(pgErr.Message, "cannot read from logical replication slot") { | ||
return ErrorNotifySlotInvalid | ||
} | ||
} | ||
} | ||
|
||
// Network related errors | ||
var netErr *net.OpError | ||
if errors.As(err, &netErr) { | ||
return ErrorNotifyConnectivity | ||
} | ||
|
||
// SSH related errors | ||
var sshErr *ssh.OpenChannelError | ||
if errors.As(err, &sshErr) { | ||
return ErrorNotifyConnectivity | ||
} | ||
return ErrorOther | ||
} | ||
|
||
func isClickHouseMvError(exception *clickhouse.Exception) bool { | ||
return strings.Contains(exception.Message, "while pushing to view") | ||
serprex marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package exceptions | ||
|
||
// PostgresSetupError represents errors during setup of Postgres peers, maybe we can later replace with a more generic error type | ||
type PostgresSetupError struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is just for easier check, so we don't have to do string matching, have used it in 1 place and can gradually increase usage There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideal would be for there to be interface that has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, I think we can do that once we have more error types |
||
error | ||
} | ||
|
||
func (e *PostgresSetupError) Error() string { | ||
return "Postgres setup error: " + e.error.Error() | ||
} | ||
|
||
func (e *PostgresSetupError) Unwrap() error { | ||
return e.error | ||
} | ||
|
||
func NewPostgresSetupError(err error) *PostgresSetupError { | ||
return &PostgresSetupError{err} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might as well compute tags, lots of overlap. Appending logic currently will cause duplicate tagging. Logic a bit annoying with early returns here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate tags are fine, will do the same in phases, will have a different prefix for classification and action
Did it to prevent going through all if cases, can remove the return from switch and move after the switches, IIRC the switches use binary search