Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass client error policy down to snapshot and output plugin #115

Merged
merged 7 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions client/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ void db_client_free(client_context_t context) {
if (context->repl.snapshot_name) free(context->repl.snapshot_name);
if (context->repl.output_plugin) free(context->repl.output_plugin);
if (context->repl.slot_name) free(context->repl.slot_name);
if (context->error_policy) free(context->error_policy);
if (context->app_name) free(context->app_name);
if (context->conninfo) free(context->conninfo);
free(context);
}

void db_client_set_error_policy(client_context_t context, const char *policy) {
if (context->error_policy) free(context->error_policy);
context->error_policy = strdup(policy);
}


/* Connects to the Postgres server (using context->conninfo for server info and
* context->app_name as client name), and checks whether replication slot
Expand Down Expand Up @@ -87,7 +93,7 @@ int db_client_start(client_context_t context) {
client_sql_disconnect(context);
context->taking_snapshot = false;

checkRepl(err, context, replication_stream_start(&context->repl));
checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy));

return err;
}
Expand All @@ -113,7 +119,7 @@ int db_client_poll(client_context_t context) {

/* If the snapshot is finished, switch over to the replication stream */
if (!context->sql_conn) {
checkRepl(err, context, replication_stream_start(&context->repl));
checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy));
}
return err;

Expand Down Expand Up @@ -327,12 +333,16 @@ int snapshot_start(client_context_t context) {
check(err, exec_sql(context, query->data));
destroyPQExpBuffer(query);

Oid argtypes[] = { 25, 16 }; // 25 == TEXTOID, 16 == BOOLOID
const char *args[] = { "%", context->allow_unkeyed ? "t" : "f" };
Oid argtypes[] = { 25, 16, 25 }; // 25 == TEXTOID, 16 == BOOLOID
const char *args[] = {
"%",
context->allow_unkeyed ? "t" : "f",
context->error_policy
};

if (!PQsendQueryParams(context->sql_conn,
"SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2)",
2, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format
"SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2, error_policy := $3)",
3, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format
client_error(context, "Could not dispatch snapshot fetch: %s",
PQerrorMessage(context->sql_conn));
return EIO;
Expand Down
2 changes: 2 additions & 0 deletions client/connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

typedef struct {
char *conninfo, *app_name;
char *error_policy;
PGconn *sql_conn;
replication_stream repl;
bool allow_unkeyed;
Expand All @@ -21,6 +22,7 @@ typedef client_context *client_context_t;

client_context_t db_client_new(void);
void db_client_free(client_context_t context);
void db_client_set_error_policy(client_context_t context, const char *policy);
int db_client_start(client_context_t context);
int db_client_poll(client_context_t context);
int db_client_wait(client_context_t context);
Expand Down
7 changes: 4 additions & 3 deletions client/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ int replication_stream_check(replication_stream_t stream) {

/* Starts streaming logical changes from replication slot stream->slot_name,
* starting from position stream->start_lsn. */
int replication_stream_start(replication_stream_t stream) {
int replication_stream_start(replication_stream_t stream, const char *error_policy) {
PQExpBuffer query = createPQExpBuffer();
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (\"error_policy\" '%s')",
stream->slot_name,
(uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn);
(uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn,
error_policy);

PGresult *res = PQexec(stream->conn, query->data);

Expand Down
2 changes: 1 addition & 1 deletion client/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ typedef replication_stream *replication_stream_t;
int replication_slot_create(replication_stream_t stream);
int replication_slot_drop(replication_stream_t stream);
int replication_stream_check(replication_stream_t stream);
int replication_stream_start(replication_stream_t stream);
int replication_stream_start(replication_stream_t stream, const char *error_policy);
int replication_stream_poll(replication_stream_t stream);
int replication_stream_keepalive(replication_stream_t stream);

Expand Down
2 changes: 1 addition & 1 deletion ext/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ AVRO_LDFLAGS = $(shell pkg-config --libs avro-c)
PG_CPPFLAGS += $(AVRO_CFLAGS) -std=c99
SHLIB_LINK += $(AVRO_LDFLAGS)

OBJS = io_util.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o
OBJS = io_util.o error_policy.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o
DATA = bottledwater--0.1.sql

PG_CONFIG = pg_config
Expand Down
11 changes: 10 additions & 1 deletion ext/bottledwater--0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ CREATE OR REPLACE FUNCTION bottledwater_row_schema(name) RETURNS text
CREATE OR REPLACE FUNCTION bottledwater_frame_schema() RETURNS text
AS 'bottledwater', 'bottledwater_frame_schema' LANGUAGE C VOLATILE STRICT;

DROP DOMAIN IF EXISTS bottledwater_error_policy;
CREATE DOMAIN bottledwater_error_policy AS text
CONSTRAINT bottledwater_error_policy_valid CHECK (VALUE IN (
-- these values should match the constants defined in protocol.h
'log',
'exit'
));

CREATE OR REPLACE FUNCTION bottledwater_export(
table_pattern text DEFAULT '%',
allow_unkeyed boolean DEFAULT false
allow_unkeyed boolean DEFAULT false,
error_policy bottledwater_error_policy DEFAULT 'exit'
) RETURNS setof bytea
AS 'bottledwater', 'bottledwater_export' LANGUAGE C VOLATILE STRICT;
40 changes: 40 additions & 0 deletions ext/error_policy.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "error_policy.h"
#include "protocol.h"

#include <string.h>
#include "postgres.h"

error_policy_t parse_error_policy(const char *str) {
if (strcmp(PROTOCOL_ERROR_POLICY_LOG, str) == 0) {
return ERROR_POLICY_LOG;
} else if (strcmp(PROTOCOL_ERROR_POLICY_EXIT, str) == 0) {
return ERROR_POLICY_EXIT;
} else {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid error_policy: %s", str)));
return ERROR_POLICY_UNDEFINED;
}
}

const char* error_policy_name(error_policy_t policy) {
switch (policy) {
case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG;
case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT;
case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)";
default: return "unknown (probably a bug)";
}
}


void error_policy_handle(error_policy_t policy, const char *message, const char *error) {
switch (policy) {
case ERROR_POLICY_LOG:
elog(WARNING, "%s: %s", message, error);
break;
case ERROR_POLICY_EXIT:
elog(ERROR, "%s: %s", message, error);
default:
elog(WARNING, "%s: %s", message, error);
elog(ERROR, "error_policy_handle: unknown error policy!");
}
}
27 changes: 27 additions & 0 deletions ext/error_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef ERROR_POLICY_H
#define ERROR_POLICY_H

typedef enum {
ERROR_POLICY_UNDEFINED = 0,
ERROR_POLICY_LOG,
ERROR_POLICY_EXIT
} error_policy_t;

static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT;

error_policy_t parse_error_policy(const char *str);
const char* error_policy_name(error_policy_t policy);

/* Handles an error according to the supplied policy.
*
* `message` should describe the context in which the error occurred, e.g. what
* the calling function was attempting to do.
*
* `error` should describe the error that occurred.
*
* This will call `ereport` with an appropriate severity depending on the
* policy, so this may cause an early exit from the calling function.
*/
void error_policy_handle(error_policy_t policy, const char *message, const char *error);

#endif /* ERROR_POLICY_H */
33 changes: 31 additions & 2 deletions ext/logdecoder.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "io_util.h"
#include "protocol_server.h"
#include "oid2avro.h"
#include "error_policy.h"

#include "replication/logical.h"
#include "replication/output_plugin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"

/* Entry point when Postgres loads the plugin */
Expand All @@ -22,6 +24,7 @@ typedef struct {
avro_value_iface_t *frame_iface;
avro_value_t frame_value;
schema_cache_t schema_cache;
error_policy_t error_policy;
} plugin_state;

void reset_frame(plugin_state *state);
Expand All @@ -42,6 +45,8 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) {

static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init) {
ListCell *option;

plugin_state *state = palloc(sizeof(plugin_state));
ctx->output_plugin_private = state;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
Expand All @@ -53,6 +58,26 @@ static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions
state->frame_iface = avro_generic_class_from_schema(state->frame_schema);
avro_generic_value_new(state->frame_iface, &state->frame_value);
state->schema_cache = schema_cache_new(ctx->context);

foreach(option, ctx->output_plugin_options) {
DefElem *elem = lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));

if (strcmp(elem->defname, "error_policy") == 0) {
if (elem->arg == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("No value specified for parameter \"%s\"",
elem->defname)));
} else {
state->error_policy = parse_error_policy(strVal(elem->arg));
}
} else {
ereport(INFO, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Parameter \"%s\" = \"%s\" is unknown",
elem->defname,
elem->arg ? strVal(elem->arg) : "(null)")));
}
}
}

static void output_avro_shutdown(LogicalDecodingContext *ctx) {
Expand Down Expand Up @@ -140,10 +165,14 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx

if (err) {
elog(INFO, "Row conversion failed: %s", schema_debug_info(rel, NULL));
elog(ERROR, "output_avro_change: row conversion failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "output_avro_change: row conversion failed", avro_strerror());
/* if handling the error didn't exit early, it should be safe to fall
* through, because we'll just write the frame without the message that
* failed (so potentially it'll be an empty frame)
*/
}
if (write_frame(ctx, state)) {
elog(ERROR, "output_avro_change: writing Avro binary failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "output_avro_change: writing Avro binary failed", avro_strerror());
}

MemoryContextSwitchTo(oldctx);
Expand Down
20 changes: 20 additions & 0 deletions ext/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@
#define PROTOCOL_MSG_DELETE 5


/* Error policies, determining what the snapshot function and output plugin
* should do if they encounter an error encoding a row.
*
* These should match the values of the bottledwater_error_policy_valid
* constraint in bottledwater--0.1.sql.
*/
/* The default policy is "exit": an error will terminate the snapshot or
* replication stream. This policy should be used if avoiding data loss is the
* top priority, since after manually resolving the error Bottled Water can be
* restarted to retry the affected rows.
*/
#define PROTOCOL_ERROR_POLICY_EXIT "exit"
/* Under the "log" policy, an error will cause Bottled Water to skip over the
* affected rows and continue, logging the error that occurred. This means the
* snapshot or replication stream may omit some updates that were successfully
* committed to Postgres, if there was a problem encoding those updates.
*/
#define PROTOCOL_ERROR_POLICY_LOG "log"


avro_schema_t schema_for_frame(void);

#endif /* PROTOCOL_H */
24 changes: 20 additions & 4 deletions ext/snapshot.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "io_util.h"
#include "oid2avro.h"
#include "protocol_server.h"
#include "error_policy.h"

#include <string.h>
#include "postgres.h"
Expand Down Expand Up @@ -30,6 +31,7 @@ typedef struct {
typedef struct {
MemoryContext memcontext;
export_table *tables;
error_policy_t error_policy;
int num_tables, current_table;
avro_schema_t frame_schema;
avro_value_iface_t *frame_iface;
Expand Down Expand Up @@ -103,6 +105,8 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
MemoryContext oldcontext;
export_state *state;
int ret;
text *table_pattern;
bool allow_unkeyed;
bytea *result;

oldcontext = CurrentMemoryContext;
Expand Down Expand Up @@ -135,7 +139,11 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
state->schema_cache = schema_cache_new(funcctx->multi_call_memory_ctx);
funcctx->user_fctx = state;

get_table_list(state, PG_GETARG_TEXT_P(0), PG_GETARG_BOOL(1));
table_pattern = PG_GETARG_TEXT_P(0);
allow_unkeyed = PG_GETARG_BOOL(1);
state->error_policy = parse_error_policy(TextDatumGetCString(PG_GETARG_TEXT_P(2)));

get_table_list(state, table_pattern, allow_unkeyed);
if (state->num_tables > 0) open_next_table(state);
}

Expand Down Expand Up @@ -166,7 +174,9 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
/* don't forget to clear the SPI temp context */
SPI_freetuptable(SPI_tuptable);

SRF_RETURN_NEXT(funcctx, PointerGetDatum(result));
if (result != NULL) {
SRF_RETURN_NEXT(funcctx, PointerGetDatum(result));
}
}
}

Expand Down Expand Up @@ -332,10 +342,16 @@ bytea *format_snapshot_row(export_state *state) {
SPI_tuptable->tupdesc, SPI_tuptable->vals[0])) {
elog(INFO, "Failed tuptable: %s", schema_debug_info(table->rel, SPI_tuptable->tupdesc));
elog(INFO, "Failed relation: %s", schema_debug_info(table->rel, RelationGetDescr(table->rel)));
elog(ERROR, "bottledwater_export: Avro conversion failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "bottledwater_export: Avro conversion failed", avro_strerror());
/* if handling the error didn't exit early, it should be safe to fall
* through, because we'll just write the frame without the message that
* failed (so potentially it'll be an empty frame)
*/
}
if (try_writing(&output, &write_avro_binary, &state->frame_value)) {
elog(ERROR, "bottledwater_export: writing Avro binary failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "bottledwater_export: writing Avro binary failed", avro_strerror());
/* if we didn't exit early, then output remains uninitialised */
return NULL;
}
return output;
}
Expand Down
Loading