From d7254e25dba3b0cfbf52c58afbbd1bb2e6fbcdeb Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Fri, 26 Aug 2016 11:36:08 -0700 Subject: [PATCH 1/7] Prototype error_policy option in output plugin --- client/replication.c | 5 ++-- ext/logdecoder.c | 67 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/client/replication.c b/client/replication.c index 0cbe0c6..0212c82 100644 --- a/client/replication.c +++ b/client/replication.c @@ -145,9 +145,10 @@ int replication_stream_check(replication_stream_t stream) { * starting from position stream->start_lsn. */ int replication_stream_start(replication_stream_t stream) { PQExpBuffer query = createPQExpBuffer(); - appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X", + appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (error_policy '%s')", stream->slot_name, - (uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn); + (uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn, + "log"); PGresult *res = PQexec(stream->conn, query->data); diff --git a/ext/logdecoder.c b/ext/logdecoder.c index a3c08c7..280dc63 100644 --- a/ext/logdecoder.c +++ b/ext/logdecoder.c @@ -4,6 +4,7 @@ #include "replication/logical.h" #include "replication/output_plugin.h" +#include "utils/builtins.h" #include "utils/memutils.h" /* Entry point when Postgres loads the plugin */ @@ -16,12 +17,25 @@ static void output_avro_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN static void output_avro_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); + +typedef enum { + ERROR_POLICY_UNDEFINED = 0, + ERROR_POLICY_LOG, + ERROR_POLICY_EXIT +} error_policy_t; + +static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT; + +static error_policy_t parse_error_policy(const char *str); +static const char* error_policy_name(error_policy_t policy); + typedef struct { MemoryContext memctx; /* reset after every change event, to prevent leaks */ avro_schema_t frame_schema; avro_value_iface_t *frame_iface; avro_value_t frame_value; schema_cache_t schema_cache; + error_policy_t error_policy; } plugin_state; void reset_frame(plugin_state *state); @@ -42,6 +56,8 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { + ListCell *option; + plugin_state *state = palloc(sizeof(plugin_state)); ctx->output_plugin_private = state; opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -53,6 +69,26 @@ static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions state->frame_iface = avro_generic_class_from_schema(state->frame_schema); avro_generic_value_new(state->frame_iface, &state->frame_value); state->schema_cache = schema_cache_new(ctx->context); + + foreach(option, ctx->output_plugin_options) { + DefElem *elem = lfirst(option); + Assert(elem->arg == NULL || IsA(elem->arg, String)); + + if (strcmp(elem->defname, "error_policy") == 0) { + if (elem->arg == NULL) { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("No value specified for parameter \"%s\"", + elem->defname))); + } else { + state->error_policy = parse_error_policy(strVal(elem->arg)); + } + } else { + ereport(INFO, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Parameter \"%s\" = \"%s\" is unknown", + elem->defname, + elem->arg ? strVal(elem->arg) : "(null)"))); + } + } } static void output_avro_shutdown(LogicalDecodingContext *ctx) { @@ -140,7 +176,15 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx if (err) { elog(INFO, "Row conversion failed: %s", schema_debug_info(rel, NULL)); - elog(ERROR, "output_avro_change: row conversion failed: %s", avro_strerror()); + switch (state->error_policy) { + case ERROR_POLICY_LOG: + elog(WARNING, "output_avro_change: row conversion failed: %s", avro_strerror()); + break; + case ERROR_POLICY_EXIT: + elog(ERROR, "output_avro_change: row conversion failed: %s", avro_strerror()); + default: + elog(ERROR, "AHHH WTF"); + } } if (write_frame(ctx, state)) { elog(ERROR, "output_avro_change: writing Avro binary failed: %s", avro_strerror()); @@ -150,6 +194,27 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx MemoryContextReset(state->memctx); } +error_policy_t parse_error_policy(const char *str) { + if (strcmp("log", str) == 0) { + return ERROR_POLICY_LOG; + } else if (strcmp("exit", str) == 0) { + return ERROR_POLICY_EXIT; + } else { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid error_policy: %s", str))); + return ERROR_POLICY_UNDEFINED; + } +} + +const char* error_policy_name(error_policy_t policy) { + switch (policy) { + case ERROR_POLICY_LOG: return "log"; + case ERROR_POLICY_EXIT: return "exit"; + case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)"; + default: return "unknown (probably a bug)"; + } +} + void reset_frame(plugin_state *state) { if (avro_value_reset(&state->frame_value)) { elog(ERROR, "Avro value reset failed: %s", avro_strerror()); From fff01ddecc6465c7b1767ff8b0055127d48c2d32 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 24 Aug 2016 17:20:19 -0700 Subject: [PATCH 2/7] Tests for large values --- ...{outage_spec.rb => error_handling_spec.rb} | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) rename spec/functional/{outage_spec.rb => error_handling_spec.rb} (68%) diff --git a/spec/functional/outage_spec.rb b/spec/functional/error_handling_spec.rb similarity index 68% rename from spec/functional/outage_spec.rb rename to spec/functional/error_handling_spec.rb index 9b65d90..42da658 100644 --- a/spec/functional/outage_spec.rb +++ b/spec/functional/error_handling_spec.rb @@ -1,11 +1,14 @@ require 'spec_helper' +require 'format_contexts' require 'test_cluster' -describe 'outages', functional: true do +describe 'error handling', functional: true, format: :json do after(:example) do TEST_CLUSTER.stop(dump_logs: false) end + LONG_STRING = ('x' * 2_000_000).freeze + let(:postgres) { TEST_CLUSTER.postgres } describe 'with --on-error=exit' do @@ -38,6 +41,16 @@ expect(TEST_CLUSTER.bottledwater_running?).to be_falsy end end + + example 'writing a large value crashes Bottled Water' do + TEST_CLUSTER.start + + postgres.exec('CREATE TABLE events (id SERIAL PRIMARY KEY, event TEXT)') + postgres.exec_params('INSERT INTO events (event) VALUES ($1)', [LONG_STRING]) + sleep 5 + + expect(TEST_CLUSTER.bottledwater_running?).to be_falsy + end end describe 'with --on-error=log' do @@ -80,5 +93,19 @@ expect(TEST_CLUSTER.bottledwater_running?).to be_truthy end end + + example 'a row with a large value gets skipped without stopping replication' do + TEST_CLUSTER.start + + postgres.exec('CREATE TABLE events (id SERIAL PRIMARY KEY NOT NULL, event TEXT)') + postgres.exec_params('INSERT INTO events (event) VALUES ($1)', ['Wednesday']) + postgres.exec_params('INSERT INTO events (event) VALUES ($1)', [LONG_STRING]) + postgres.exec_params('INSERT INTO events (event) VALUES ($1)', ['Friday']) + sleep 1 + + messages = kafka_take_messages('events', 2) + events = messages.map {|message| fetch_string(decode_value(message.value), 'event') } + expect(events).to eq(['Wednesday', 'Friday']) + end end end From edc25dea14704700d8703b279ba1e7319e7cef7f Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 24 Aug 2016 17:21:29 -0700 Subject: [PATCH 3/7] Pass client error_policy down to output plugin --- client/connect.c | 10 ++++++++-- client/connect.h | 2 ++ client/replication.c | 4 ++-- client/replication.h | 2 +- ext/logdecoder.c | 8 ++++---- ext/protocol.h | 20 ++++++++++++++++++++ kafka/bottledwater.c | 13 ++++++++----- 7 files changed, 45 insertions(+), 14 deletions(-) diff --git a/client/connect.c b/client/connect.c index 9fa8613..d6ce623 100644 --- a/client/connect.c +++ b/client/connect.c @@ -49,11 +49,17 @@ void db_client_free(client_context_t context) { if (context->repl.snapshot_name) free(context->repl.snapshot_name); if (context->repl.output_plugin) free(context->repl.output_plugin); if (context->repl.slot_name) free(context->repl.slot_name); + if (context->error_policy) free(context->error_policy); if (context->app_name) free(context->app_name); if (context->conninfo) free(context->conninfo); free(context); } +void db_client_set_error_policy(client_context_t context, const char *policy) { + if (context->error_policy) free(context->error_policy); + context->error_policy = strdup(policy); +} + /* Connects to the Postgres server (using context->conninfo for server info and * context->app_name as client name), and checks whether replication slot @@ -87,7 +93,7 @@ int db_client_start(client_context_t context) { client_sql_disconnect(context); context->taking_snapshot = false; - checkRepl(err, context, replication_stream_start(&context->repl)); + checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy)); return err; } @@ -113,7 +119,7 @@ int db_client_poll(client_context_t context) { /* If the snapshot is finished, switch over to the replication stream */ if (!context->sql_conn) { - checkRepl(err, context, replication_stream_start(&context->repl)); + checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy)); } return err; diff --git a/client/connect.h b/client/connect.h index 4471acb..1731786 100644 --- a/client/connect.h +++ b/client/connect.h @@ -7,6 +7,7 @@ typedef struct { char *conninfo, *app_name; + char *error_policy; PGconn *sql_conn; replication_stream repl; bool allow_unkeyed; @@ -21,6 +22,7 @@ typedef client_context *client_context_t; client_context_t db_client_new(void); void db_client_free(client_context_t context); +void db_client_set_error_policy(client_context_t context, const char *policy); int db_client_start(client_context_t context); int db_client_poll(client_context_t context); int db_client_wait(client_context_t context); diff --git a/client/replication.c b/client/replication.c index 0212c82..920812c 100644 --- a/client/replication.c +++ b/client/replication.c @@ -143,12 +143,12 @@ int replication_stream_check(replication_stream_t stream) { /* Starts streaming logical changes from replication slot stream->slot_name, * starting from position stream->start_lsn. */ -int replication_stream_start(replication_stream_t stream) { +int replication_stream_start(replication_stream_t stream, const char *error_policy) { PQExpBuffer query = createPQExpBuffer(); appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (error_policy '%s')", stream->slot_name, (uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn, - "log"); + error_policy); PGresult *res = PQexec(stream->conn, query->data); diff --git a/client/replication.h b/client/replication.h index 8720368..aaf04ac 100644 --- a/client/replication.h +++ b/client/replication.h @@ -26,7 +26,7 @@ typedef replication_stream *replication_stream_t; int replication_slot_create(replication_stream_t stream); int replication_slot_drop(replication_stream_t stream); int replication_stream_check(replication_stream_t stream); -int replication_stream_start(replication_stream_t stream); +int replication_stream_start(replication_stream_t stream, const char *error_policy); int replication_stream_poll(replication_stream_t stream); int replication_stream_keepalive(replication_stream_t stream); diff --git a/ext/logdecoder.c b/ext/logdecoder.c index 280dc63..cba0b29 100644 --- a/ext/logdecoder.c +++ b/ext/logdecoder.c @@ -195,9 +195,9 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx } error_policy_t parse_error_policy(const char *str) { - if (strcmp("log", str) == 0) { + if (strcmp(PROTOCOL_ERROR_POLICY_LOG, str) == 0) { return ERROR_POLICY_LOG; - } else if (strcmp("exit", str) == 0) { + } else if (strcmp(PROTOCOL_ERROR_POLICY_EXIT, str) == 0) { return ERROR_POLICY_EXIT; } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -208,8 +208,8 @@ error_policy_t parse_error_policy(const char *str) { const char* error_policy_name(error_policy_t policy) { switch (policy) { - case ERROR_POLICY_LOG: return "log"; - case ERROR_POLICY_EXIT: return "exit"; + case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG; + case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT; case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)"; default: return "unknown (probably a bug)"; } diff --git a/ext/protocol.h b/ext/protocol.h index 1709c21..96000cd 100644 --- a/ext/protocol.h +++ b/ext/protocol.h @@ -24,6 +24,26 @@ #define PROTOCOL_MSG_DELETE 5 +/* Error policies, determining what the snapshot function and output plugin + * should do if they encounter an error encoding a row. + * + * These should match the values of the bottledwater_error_policy_valid + * constraint in bottledwater--0.1.sql. + */ +/* The default policy is "exit": an error will terminate the snapshot or + * replication stream. This policy should be used if avoiding data loss is the + * top priority, since after manually resolving the error Bottled Water can be + * restarted to retry the affected rows. + */ +#define PROTOCOL_ERROR_POLICY_EXIT "exit" +/* Under the "log" policy, an error will cause Bottled Water to skip over the + * affected rows and continue, logging the error that occurred. This means the + * snapshot or replication stream may omit some updates that were successfully + * committed to Postgres, if there was a problem encoding those updates. + */ +#define PROTOCOL_ERROR_POLICY_LOG "log" + + avro_schema_t schema_for_frame(void); #endif /* PROTOCOL_H */ diff --git a/kafka/bottledwater.c b/kafka/bottledwater.c index 3b31bb6..cbeb957 100644 --- a/kafka/bottledwater.c +++ b/kafka/bottledwater.c @@ -67,7 +67,7 @@ typedef enum { ERROR_POLICY_EXIT } error_policy_t; -static const char* DEFAULT_ERROR_POLICY_NAME = "exit"; +static const char* DEFAULT_ERROR_POLICY_NAME = PROTOCOL_ERROR_POLICY_EXIT; static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT; @@ -340,20 +340,22 @@ const char* output_format_name(format_t format) { } void set_error_policy(producer_context_t context, char *policy) { - if (!strcmp("log", policy)) { + if (!strcmp(PROTOCOL_ERROR_POLICY_LOG, policy)) { context->error_policy = ERROR_POLICY_LOG; - } else if (!strcmp("exit", policy)) { + } else if (!strcmp(PROTOCOL_ERROR_POLICY_EXIT, policy)) { context->error_policy = ERROR_POLICY_EXIT; } else { config_error("invalid error policy (expected log or exit): %s", policy); exit(1); } + + db_client_set_error_policy(context->client, policy); } const char* error_policy_name(error_policy_t policy) { switch (policy) { - case ERROR_POLICY_LOG: return "log"; - case ERROR_POLICY_EXIT: return "exit"; + case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG; + case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT; case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)"; default: return "unknown (probably a bug)"; } @@ -744,6 +746,7 @@ client_context_t init_client() { client_context_t client = db_client_new(); client->app_name = strdup(APP_NAME); + db_client_set_error_policy(client, DEFAULT_ERROR_POLICY_NAME); client->allow_unkeyed = false; client->repl.slot_name = strdup(DEFAULT_REPLICATION_SLOT); client->repl.output_plugin = strdup(OUTPUT_PLUGIN); From b71470e388581b70c00b67360d9bf849397a6b25 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 25 Aug 2016 13:50:23 -0700 Subject: [PATCH 4/7] Pass client error_policy to snapshot too --- client/connect.c | 12 ++++++--- ext/Makefile | 2 +- ext/bottledwater--0.1.sql | 11 +++++++- ext/error_policy.c | 26 ++++++++++++++++++ ext/error_policy.h | 15 +++++++++++ ext/logdecoder.c | 34 +---------------------- ext/snapshot.c | 20 ++++++++++++-- spec/functional/error_handling_spec.rb | 37 ++++++++++++++++++++++++++ 8 files changed, 116 insertions(+), 41 deletions(-) create mode 100644 ext/error_policy.c create mode 100644 ext/error_policy.h diff --git a/client/connect.c b/client/connect.c index d6ce623..f7bf9fe 100644 --- a/client/connect.c +++ b/client/connect.c @@ -333,12 +333,16 @@ int snapshot_start(client_context_t context) { check(err, exec_sql(context, query->data)); destroyPQExpBuffer(query); - Oid argtypes[] = { 25, 16 }; // 25 == TEXTOID, 16 == BOOLOID - const char *args[] = { "%", context->allow_unkeyed ? "t" : "f" }; + Oid argtypes[] = { 25, 16, 25 }; // 25 == TEXTOID, 16 == BOOLOID + const char *args[] = { + "%", + context->allow_unkeyed ? "t" : "f", + context->error_policy + }; if (!PQsendQueryParams(context->sql_conn, - "SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2)", - 2, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format + "SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2, error_policy := $3)", + 3, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format client_error(context, "Could not dispatch snapshot fetch: %s", PQerrorMessage(context->sql_conn)); return EIO; diff --git a/ext/Makefile b/ext/Makefile index a29d668..4ed629e 100644 --- a/ext/Makefile +++ b/ext/Makefile @@ -7,7 +7,7 @@ AVRO_LDFLAGS = $(shell pkg-config --libs avro-c) PG_CPPFLAGS += $(AVRO_CFLAGS) -std=c99 SHLIB_LINK += $(AVRO_LDFLAGS) -OBJS = io_util.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o +OBJS = io_util.o error_policy.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o DATA = bottledwater--0.1.sql PG_CONFIG = pg_config diff --git a/ext/bottledwater--0.1.sql b/ext/bottledwater--0.1.sql index 0ac0c71..a44fee2 100644 --- a/ext/bottledwater--0.1.sql +++ b/ext/bottledwater--0.1.sql @@ -10,8 +10,17 @@ CREATE OR REPLACE FUNCTION bottledwater_row_schema(name) RETURNS text CREATE OR REPLACE FUNCTION bottledwater_frame_schema() RETURNS text AS 'bottledwater', 'bottledwater_frame_schema' LANGUAGE C VOLATILE STRICT; +DROP DOMAIN IF EXISTS bottledwater_error_policy; +CREATE DOMAIN bottledwater_error_policy AS text + CONSTRAINT bottledwater_error_policy_valid CHECK (VALUE IN ( + -- these values should match the constants defined in protocol.h + 'log', + 'exit' + )); + CREATE OR REPLACE FUNCTION bottledwater_export( table_pattern text DEFAULT '%', - allow_unkeyed boolean DEFAULT false + allow_unkeyed boolean DEFAULT false, + error_policy bottledwater_error_policy DEFAULT 'exit' ) RETURNS setof bytea AS 'bottledwater', 'bottledwater_export' LANGUAGE C VOLATILE STRICT; diff --git a/ext/error_policy.c b/ext/error_policy.c new file mode 100644 index 0000000..f3e92c2 --- /dev/null +++ b/ext/error_policy.c @@ -0,0 +1,26 @@ +#include "error_policy.h" +#include "protocol.h" + +#include +#include "postgres.h" + +error_policy_t parse_error_policy(const char *str) { + if (strcmp(PROTOCOL_ERROR_POLICY_LOG, str) == 0) { + return ERROR_POLICY_LOG; + } else if (strcmp(PROTOCOL_ERROR_POLICY_EXIT, str) == 0) { + return ERROR_POLICY_EXIT; + } else { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid error_policy: %s", str))); + return ERROR_POLICY_UNDEFINED; + } +} + +const char* error_policy_name(error_policy_t policy) { + switch (policy) { + case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG; + case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT; + case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)"; + default: return "unknown (probably a bug)"; + } +} diff --git a/ext/error_policy.h b/ext/error_policy.h new file mode 100644 index 0000000..08fb369 --- /dev/null +++ b/ext/error_policy.h @@ -0,0 +1,15 @@ +#ifndef ERROR_POLICY_H +#define ERROR_POLICY_H + +typedef enum { + ERROR_POLICY_UNDEFINED = 0, + ERROR_POLICY_LOG, + ERROR_POLICY_EXIT +} error_policy_t; + +static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT; + +error_policy_t parse_error_policy(const char *str); +const char* error_policy_name(error_policy_t policy); + +#endif /* ERROR_POLICY_H */ diff --git a/ext/logdecoder.c b/ext/logdecoder.c index cba0b29..756ce69 100644 --- a/ext/logdecoder.c +++ b/ext/logdecoder.c @@ -1,6 +1,7 @@ #include "io_util.h" #include "protocol_server.h" #include "oid2avro.h" +#include "error_policy.h" #include "replication/logical.h" #include "replication/output_plugin.h" @@ -17,18 +18,6 @@ static void output_avro_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN static void output_avro_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); - -typedef enum { - ERROR_POLICY_UNDEFINED = 0, - ERROR_POLICY_LOG, - ERROR_POLICY_EXIT -} error_policy_t; - -static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT; - -static error_policy_t parse_error_policy(const char *str); -static const char* error_policy_name(error_policy_t policy); - typedef struct { MemoryContext memctx; /* reset after every change event, to prevent leaks */ avro_schema_t frame_schema; @@ -194,27 +183,6 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx MemoryContextReset(state->memctx); } -error_policy_t parse_error_policy(const char *str) { - if (strcmp(PROTOCOL_ERROR_POLICY_LOG, str) == 0) { - return ERROR_POLICY_LOG; - } else if (strcmp(PROTOCOL_ERROR_POLICY_EXIT, str) == 0) { - return ERROR_POLICY_EXIT; - } else { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid error_policy: %s", str))); - return ERROR_POLICY_UNDEFINED; - } -} - -const char* error_policy_name(error_policy_t policy) { - switch (policy) { - case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG; - case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT; - case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)"; - default: return "unknown (probably a bug)"; - } -} - void reset_frame(plugin_state *state) { if (avro_value_reset(&state->frame_value)) { elog(ERROR, "Avro value reset failed: %s", avro_strerror()); diff --git a/ext/snapshot.c b/ext/snapshot.c index f471898..3513399 100644 --- a/ext/snapshot.c +++ b/ext/snapshot.c @@ -1,6 +1,7 @@ #include "io_util.h" #include "oid2avro.h" #include "protocol_server.h" +#include "error_policy.h" #include #include "postgres.h" @@ -30,6 +31,7 @@ typedef struct { typedef struct { MemoryContext memcontext; export_table *tables; + error_policy_t error_policy; int num_tables, current_table; avro_schema_t frame_schema; avro_value_iface_t *frame_iface; @@ -103,6 +105,8 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) { MemoryContext oldcontext; export_state *state; int ret; + text *table_pattern; + bool allow_unkeyed; bytea *result; oldcontext = CurrentMemoryContext; @@ -135,7 +139,11 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) { state->schema_cache = schema_cache_new(funcctx->multi_call_memory_ctx); funcctx->user_fctx = state; - get_table_list(state, PG_GETARG_TEXT_P(0), PG_GETARG_BOOL(1)); + table_pattern = PG_GETARG_TEXT_P(0); + allow_unkeyed = PG_GETARG_BOOL(1); + state->error_policy = parse_error_policy(TextDatumGetCString(PG_GETARG_TEXT_P(2))); + + get_table_list(state, table_pattern, allow_unkeyed); if (state->num_tables > 0) open_next_table(state); } @@ -332,7 +340,15 @@ bytea *format_snapshot_row(export_state *state) { SPI_tuptable->tupdesc, SPI_tuptable->vals[0])) { elog(INFO, "Failed tuptable: %s", schema_debug_info(table->rel, SPI_tuptable->tupdesc)); elog(INFO, "Failed relation: %s", schema_debug_info(table->rel, RelationGetDescr(table->rel))); - elog(ERROR, "bottledwater_export: Avro conversion failed: %s", avro_strerror()); + switch (state->error_policy) { + case ERROR_POLICY_LOG: + elog(WARNING, "bottledwater_export: Avro conversion failed: %s", avro_strerror()); + break; + case ERROR_POLICY_EXIT: + elog(ERROR, "bottledwater_export: Avro conversion failed: %s", avro_strerror()); + default: + elog(ERROR, "AHHH WTF"); + } } if (try_writing(&output, &write_avro_binary, &state->frame_value)) { elog(ERROR, "bottledwater_export: writing Avro binary failed: %s", avro_strerror()); diff --git a/spec/functional/error_handling_spec.rb b/spec/functional/error_handling_spec.rb index 42da658..47ae746 100644 --- a/spec/functional/error_handling_spec.rb +++ b/spec/functional/error_handling_spec.rb @@ -8,9 +8,18 @@ end LONG_STRING = ('x' * 2_000_000).freeze + LARGE_JSON = JSON.generate({}.tap do |h| + 100_000.times do |i| + h["property#{i}"] = i + end + end).freeze let(:postgres) { TEST_CLUSTER.postgres } + def fetch_json(object, name) + JSON.parse(fetch_string(object, name)) + end + describe 'with --on-error=exit' do before(:example) do TEST_CLUSTER.bottledwater_on_error = :exit @@ -51,6 +60,17 @@ expect(TEST_CLUSTER.bottledwater_running?).to be_falsy end + + example 'if existing data contains a large value, Bottled Water crashes during snapshot' do + TEST_CLUSTER.before_service(TEST_CLUSTER.bottledwater_service, 'Prepopulating users table') do |cluster| + cluster.postgres.exec('CREATE TABLE users (id SERIAL PRIMARY KEY, prefs JSONB)') + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, ['{"colour":"red"}']) + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, [LARGE_JSON]) + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, ['{"colour":"blue"}']) + end + + expect { TEST_CLUSTER.start }.to raise_error(/bottledwater.* not ready/i) + end end describe 'with --on-error=log' do @@ -107,5 +127,22 @@ events = messages.map {|message| fetch_string(decode_value(message.value), 'event') } expect(events).to eq(['Wednesday', 'Friday']) end + + example 'if existing data contains a large value, Bottled Water skips that row during snapshot' do + TEST_CLUSTER.before_service(TEST_CLUSTER.bottledwater_service, 'Prepopulating users table') do |cluster| + cluster.postgres.exec('CREATE TABLE users (id SERIAL PRIMARY KEY, prefs JSONB)') + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, ['{"colour":"red"}']) + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, [LARGE_JSON]) + cluster.postgres.exec_params(%{INSERT INTO users (prefs) VALUES ($1)}, ['{"colour":"blue"}']) + end + + TEST_CLUSTER.start + + messages = kafka_take_messages('users', 2) + prefs = messages.map {|message| fetch_json(decode_value(message.value), 'prefs') } + prefs.each {|pref| expect(pref).to have_key('colour') } + colours = prefs.map {|pref| pref.fetch('colour') } + expect(colours).to eq(['red', 'blue']) + end end end From 6498a1f9c52c01646f706f05ee01e55ce060eae1 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 25 Aug 2016 14:21:44 -0700 Subject: [PATCH 5/7] Exclude our own domain type from tests --- spec/bin/generate_type_specs.rb | 1 + spec/functional/type_specs.rb | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/spec/bin/generate_type_specs.rb b/spec/bin/generate_type_specs.rb index 30e9e4a..498e863 100644 --- a/spec/bin/generate_type_specs.rb +++ b/spec/bin/generate_type_specs.rb @@ -97,6 +97,7 @@ def genvalue(value) INTERNAL_TYPES = Set[*%w( abstime aclitem + bottledwater_error_policy "char" cid ghstore diff --git a/spec/functional/type_specs.rb b/spec/functional/type_specs.rb index 21f6699..f461750 100644 --- a/spec/functional/type_specs.rb +++ b/spec/functional/type_specs.rb @@ -40,6 +40,10 @@ include_examples 'roundtrip type', "boolean", true end + describe 'bottledwater_error_policy' do + example('internal type not supported') {} + end + describe 'box' do include_examples 'geometric type', "box", "(3,4),(0,0)" end From ae9acb99d0f57c114f187a453303521e6353972d Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 25 Aug 2016 19:11:46 -0700 Subject: [PATCH 6/7] Clean up error handling --- ext/error_policy.c | 14 ++++++++++++++ ext/error_policy.h | 12 ++++++++++++ ext/logdecoder.c | 16 ++++++---------- ext/snapshot.c | 22 +++++++++++----------- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/ext/error_policy.c b/ext/error_policy.c index f3e92c2..b84b21e 100644 --- a/ext/error_policy.c +++ b/ext/error_policy.c @@ -24,3 +24,17 @@ const char* error_policy_name(error_policy_t policy) { default: return "unknown (probably a bug)"; } } + + +void error_policy_handle(error_policy_t policy, const char *message, const char *error) { + switch (policy) { + case ERROR_POLICY_LOG: + elog(WARNING, "%s: %s", message, error); + break; + case ERROR_POLICY_EXIT: + elog(ERROR, "%s: %s", message, error); + default: + elog(WARNING, "%s: %s", message, error); + elog(ERROR, "error_policy_handle: unknown error policy!"); + } +} diff --git a/ext/error_policy.h b/ext/error_policy.h index 08fb369..09a98b5 100644 --- a/ext/error_policy.h +++ b/ext/error_policy.h @@ -12,4 +12,16 @@ static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT; error_policy_t parse_error_policy(const char *str); const char* error_policy_name(error_policy_t policy); +/* Handles an error according to the supplied policy. + * + * `message` should describe the context in which the error occurred, e.g. what + * the calling function was attempting to do. + * + * `error` should describe the error that occurred. + * + * This will call `ereport` with an appropriate severity depending on the + * policy, so this may cause an early exit from the calling function. + */ +void error_policy_handle(error_policy_t policy, const char *message, const char *error); + #endif /* ERROR_POLICY_H */ diff --git a/ext/logdecoder.c b/ext/logdecoder.c index 756ce69..31457cc 100644 --- a/ext/logdecoder.c +++ b/ext/logdecoder.c @@ -165,18 +165,14 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx if (err) { elog(INFO, "Row conversion failed: %s", schema_debug_info(rel, NULL)); - switch (state->error_policy) { - case ERROR_POLICY_LOG: - elog(WARNING, "output_avro_change: row conversion failed: %s", avro_strerror()); - break; - case ERROR_POLICY_EXIT: - elog(ERROR, "output_avro_change: row conversion failed: %s", avro_strerror()); - default: - elog(ERROR, "AHHH WTF"); - } + error_policy_handle(state->error_policy, "output_avro_change: row conversion failed", avro_strerror()); + /* if handling the error didn't exit early, it should be safe to fall + * through, because we'll just write the frame without the message that + * failed (so potentially it'll be an empty frame) + */ } if (write_frame(ctx, state)) { - elog(ERROR, "output_avro_change: writing Avro binary failed: %s", avro_strerror()); + error_policy_handle(state->error_policy, "output_avro_change: writing Avro binary failed", avro_strerror()); } MemoryContextSwitchTo(oldctx); diff --git a/ext/snapshot.c b/ext/snapshot.c index 3513399..1d6a9d2 100644 --- a/ext/snapshot.c +++ b/ext/snapshot.c @@ -174,7 +174,9 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) { /* don't forget to clear the SPI temp context */ SPI_freetuptable(SPI_tuptable); - SRF_RETURN_NEXT(funcctx, PointerGetDatum(result)); + if (result != NULL) { + SRF_RETURN_NEXT(funcctx, PointerGetDatum(result)); + } } } @@ -340,18 +342,16 @@ bytea *format_snapshot_row(export_state *state) { SPI_tuptable->tupdesc, SPI_tuptable->vals[0])) { elog(INFO, "Failed tuptable: %s", schema_debug_info(table->rel, SPI_tuptable->tupdesc)); elog(INFO, "Failed relation: %s", schema_debug_info(table->rel, RelationGetDescr(table->rel))); - switch (state->error_policy) { - case ERROR_POLICY_LOG: - elog(WARNING, "bottledwater_export: Avro conversion failed: %s", avro_strerror()); - break; - case ERROR_POLICY_EXIT: - elog(ERROR, "bottledwater_export: Avro conversion failed: %s", avro_strerror()); - default: - elog(ERROR, "AHHH WTF"); - } + error_policy_handle(state->error_policy, "bottledwater_export: Avro conversion failed", avro_strerror()); + /* if handling the error didn't exit early, it should be safe to fall + * through, because we'll just write the frame without the message that + * failed (so potentially it'll be an empty frame) + */ } if (try_writing(&output, &write_avro_binary, &state->frame_value)) { - elog(ERROR, "bottledwater_export: writing Avro binary failed: %s", avro_strerror()); + error_policy_handle(state->error_policy, "bottledwater_export: writing Avro binary failed", avro_strerror()); + /* if we didn't exit early, then output remains uninitialised */ + return NULL; } return output; } From 5d874285bbaeadcac978a73ea54ca7701a31420d Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Mon, 29 Aug 2016 15:52:05 -0700 Subject: [PATCH 7/7] Quote output plugin param name for consistency --- client/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/replication.c b/client/replication.c index 920812c..bc529e0 100644 --- a/client/replication.c +++ b/client/replication.c @@ -145,7 +145,7 @@ int replication_stream_check(replication_stream_t stream) { * starting from position stream->start_lsn. */ int replication_stream_start(replication_stream_t stream, const char *error_policy) { PQExpBuffer query = createPQExpBuffer(); - appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (error_policy '%s')", + appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (\"error_policy\" '%s')", stream->slot_name, (uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn, error_policy);