Skip to content

Commit

Permalink
Merge pull request #115 from samstokes/policy-passdown
Browse files Browse the repository at this point in the history
Pass client error policy down to snapshot and output plugin
  • Loading branch information
samstokes authored Aug 31, 2016
2 parents caa6f24 + 5d87428 commit b11d925
Show file tree
Hide file tree
Showing 16 changed files with 333 additions and 107 deletions.
22 changes: 16 additions & 6 deletions client/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ void db_client_free(client_context_t context) {
if (context->repl.snapshot_name) free(context->repl.snapshot_name);
if (context->repl.output_plugin) free(context->repl.output_plugin);
if (context->repl.slot_name) free(context->repl.slot_name);
if (context->error_policy) free(context->error_policy);
if (context->app_name) free(context->app_name);
if (context->conninfo) free(context->conninfo);
free(context);
}

void db_client_set_error_policy(client_context_t context, const char *policy) {
if (context->error_policy) free(context->error_policy);
context->error_policy = strdup(policy);
}


/* Connects to the Postgres server (using context->conninfo for server info and
* context->app_name as client name), and checks whether replication slot
Expand Down Expand Up @@ -87,7 +93,7 @@ int db_client_start(client_context_t context) {
client_sql_disconnect(context);
context->taking_snapshot = false;

checkRepl(err, context, replication_stream_start(&context->repl));
checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy));

return err;
}
Expand All @@ -113,7 +119,7 @@ int db_client_poll(client_context_t context) {

/* If the snapshot is finished, switch over to the replication stream */
if (!context->sql_conn) {
checkRepl(err, context, replication_stream_start(&context->repl));
checkRepl(err, context, replication_stream_start(&context->repl, context->error_policy));
}
return err;

Expand Down Expand Up @@ -327,12 +333,16 @@ int snapshot_start(client_context_t context) {
check(err, exec_sql(context, query->data));
destroyPQExpBuffer(query);

Oid argtypes[] = { 25, 16 }; // 25 == TEXTOID, 16 == BOOLOID
const char *args[] = { "%", context->allow_unkeyed ? "t" : "f" };
Oid argtypes[] = { 25, 16, 25 }; // 25 == TEXTOID, 16 == BOOLOID
const char *args[] = {
"%",
context->allow_unkeyed ? "t" : "f",
context->error_policy
};

if (!PQsendQueryParams(context->sql_conn,
"SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2)",
2, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format
"SELECT bottledwater_export(table_pattern := $1, allow_unkeyed := $2, error_policy := $3)",
3, argtypes, args, NULL, NULL, 1)) { // The final 1 requests results in binary format
client_error(context, "Could not dispatch snapshot fetch: %s",
PQerrorMessage(context->sql_conn));
return EIO;
Expand Down
2 changes: 2 additions & 0 deletions client/connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

typedef struct {
char *conninfo, *app_name;
char *error_policy;
PGconn *sql_conn;
replication_stream repl;
bool allow_unkeyed;
Expand All @@ -21,6 +22,7 @@ typedef client_context *client_context_t;

client_context_t db_client_new(void);
void db_client_free(client_context_t context);
void db_client_set_error_policy(client_context_t context, const char *policy);
int db_client_start(client_context_t context);
int db_client_poll(client_context_t context);
int db_client_wait(client_context_t context);
Expand Down
7 changes: 4 additions & 3 deletions client/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ int replication_stream_check(replication_stream_t stream) {

/* Starts streaming logical changes from replication slot stream->slot_name,
* starting from position stream->start_lsn. */
int replication_stream_start(replication_stream_t stream) {
int replication_stream_start(replication_stream_t stream, const char *error_policy) {
PQExpBuffer query = createPQExpBuffer();
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (\"error_policy\" '%s')",
stream->slot_name,
(uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn);
(uint32) (stream->start_lsn >> 32), (uint32) stream->start_lsn,
error_policy);

PGresult *res = PQexec(stream->conn, query->data);

Expand Down
2 changes: 1 addition & 1 deletion client/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ typedef replication_stream *replication_stream_t;
int replication_slot_create(replication_stream_t stream);
int replication_slot_drop(replication_stream_t stream);
int replication_stream_check(replication_stream_t stream);
int replication_stream_start(replication_stream_t stream);
int replication_stream_start(replication_stream_t stream, const char *error_policy);
int replication_stream_poll(replication_stream_t stream);
int replication_stream_keepalive(replication_stream_t stream);

Expand Down
2 changes: 1 addition & 1 deletion ext/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ AVRO_LDFLAGS = $(shell pkg-config --libs avro-c)
PG_CPPFLAGS += $(AVRO_CFLAGS) -std=c99
SHLIB_LINK += $(AVRO_LDFLAGS)

OBJS = io_util.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o
OBJS = io_util.o error_policy.o logdecoder.o oid2avro.o schema_cache.o protocol.o protocol_server.o snapshot.o
DATA = bottledwater--0.1.sql

PG_CONFIG = pg_config
Expand Down
11 changes: 10 additions & 1 deletion ext/bottledwater--0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ CREATE OR REPLACE FUNCTION bottledwater_row_schema(name) RETURNS text
CREATE OR REPLACE FUNCTION bottledwater_frame_schema() RETURNS text
AS 'bottledwater', 'bottledwater_frame_schema' LANGUAGE C VOLATILE STRICT;

DROP DOMAIN IF EXISTS bottledwater_error_policy;
CREATE DOMAIN bottledwater_error_policy AS text
CONSTRAINT bottledwater_error_policy_valid CHECK (VALUE IN (
-- these values should match the constants defined in protocol.h
'log',
'exit'
));

CREATE OR REPLACE FUNCTION bottledwater_export(
table_pattern text DEFAULT '%',
allow_unkeyed boolean DEFAULT false
allow_unkeyed boolean DEFAULT false,
error_policy bottledwater_error_policy DEFAULT 'exit'
) RETURNS setof bytea
AS 'bottledwater', 'bottledwater_export' LANGUAGE C VOLATILE STRICT;
40 changes: 40 additions & 0 deletions ext/error_policy.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "error_policy.h"
#include "protocol.h"

#include <string.h>
#include "postgres.h"

error_policy_t parse_error_policy(const char *str) {
if (strcmp(PROTOCOL_ERROR_POLICY_LOG, str) == 0) {
return ERROR_POLICY_LOG;
} else if (strcmp(PROTOCOL_ERROR_POLICY_EXIT, str) == 0) {
return ERROR_POLICY_EXIT;
} else {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid error_policy: %s", str)));
return ERROR_POLICY_UNDEFINED;
}
}

const char* error_policy_name(error_policy_t policy) {
switch (policy) {
case ERROR_POLICY_LOG: return PROTOCOL_ERROR_POLICY_LOG;
case ERROR_POLICY_EXIT: return PROTOCOL_ERROR_POLICY_EXIT;
case ERROR_POLICY_UNDEFINED: return "undefined (probably a bug)";
default: return "unknown (probably a bug)";
}
}


void error_policy_handle(error_policy_t policy, const char *message, const char *error) {
switch (policy) {
case ERROR_POLICY_LOG:
elog(WARNING, "%s: %s", message, error);
break;
case ERROR_POLICY_EXIT:
elog(ERROR, "%s: %s", message, error);
default:
elog(WARNING, "%s: %s", message, error);
elog(ERROR, "error_policy_handle: unknown error policy!");
}
}
27 changes: 27 additions & 0 deletions ext/error_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef ERROR_POLICY_H
#define ERROR_POLICY_H

typedef enum {
ERROR_POLICY_UNDEFINED = 0,
ERROR_POLICY_LOG,
ERROR_POLICY_EXIT
} error_policy_t;

static const error_policy_t DEFAULT_ERROR_POLICY = ERROR_POLICY_EXIT;

error_policy_t parse_error_policy(const char *str);
const char* error_policy_name(error_policy_t policy);

/* Handles an error according to the supplied policy.
*
* `message` should describe the context in which the error occurred, e.g. what
* the calling function was attempting to do.
*
* `error` should describe the error that occurred.
*
* This will call `ereport` with an appropriate severity depending on the
* policy, so this may cause an early exit from the calling function.
*/
void error_policy_handle(error_policy_t policy, const char *message, const char *error);

#endif /* ERROR_POLICY_H */
33 changes: 31 additions & 2 deletions ext/logdecoder.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "io_util.h"
#include "protocol_server.h"
#include "oid2avro.h"
#include "error_policy.h"

#include "replication/logical.h"
#include "replication/output_plugin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"

/* Entry point when Postgres loads the plugin */
Expand All @@ -22,6 +24,7 @@ typedef struct {
avro_value_iface_t *frame_iface;
avro_value_t frame_value;
schema_cache_t schema_cache;
error_policy_t error_policy;
} plugin_state;

void reset_frame(plugin_state *state);
Expand All @@ -42,6 +45,8 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) {

static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init) {
ListCell *option;

plugin_state *state = palloc(sizeof(plugin_state));
ctx->output_plugin_private = state;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
Expand All @@ -53,6 +58,26 @@ static void output_avro_startup(LogicalDecodingContext *ctx, OutputPluginOptions
state->frame_iface = avro_generic_class_from_schema(state->frame_schema);
avro_generic_value_new(state->frame_iface, &state->frame_value);
state->schema_cache = schema_cache_new(ctx->context);

foreach(option, ctx->output_plugin_options) {
DefElem *elem = lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));

if (strcmp(elem->defname, "error_policy") == 0) {
if (elem->arg == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("No value specified for parameter \"%s\"",
elem->defname)));
} else {
state->error_policy = parse_error_policy(strVal(elem->arg));
}
} else {
ereport(INFO, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Parameter \"%s\" = \"%s\" is unknown",
elem->defname,
elem->arg ? strVal(elem->arg) : "(null)")));
}
}
}

static void output_avro_shutdown(LogicalDecodingContext *ctx) {
Expand Down Expand Up @@ -140,10 +165,14 @@ static void output_avro_change(LogicalDecodingContext *ctx, ReorderBufferTXN *tx

if (err) {
elog(INFO, "Row conversion failed: %s", schema_debug_info(rel, NULL));
elog(ERROR, "output_avro_change: row conversion failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "output_avro_change: row conversion failed", avro_strerror());
/* if handling the error didn't exit early, it should be safe to fall
* through, because we'll just write the frame without the message that
* failed (so potentially it'll be an empty frame)
*/
}
if (write_frame(ctx, state)) {
elog(ERROR, "output_avro_change: writing Avro binary failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "output_avro_change: writing Avro binary failed", avro_strerror());
}

MemoryContextSwitchTo(oldctx);
Expand Down
20 changes: 20 additions & 0 deletions ext/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@
#define PROTOCOL_MSG_DELETE 5


/* Error policies, determining what the snapshot function and output plugin
* should do if they encounter an error encoding a row.
*
* These should match the values of the bottledwater_error_policy_valid
* constraint in bottledwater--0.1.sql.
*/
/* The default policy is "exit": an error will terminate the snapshot or
* replication stream. This policy should be used if avoiding data loss is the
* top priority, since after manually resolving the error Bottled Water can be
* restarted to retry the affected rows.
*/
#define PROTOCOL_ERROR_POLICY_EXIT "exit"
/* Under the "log" policy, an error will cause Bottled Water to skip over the
* affected rows and continue, logging the error that occurred. This means the
* snapshot or replication stream may omit some updates that were successfully
* committed to Postgres, if there was a problem encoding those updates.
*/
#define PROTOCOL_ERROR_POLICY_LOG "log"


avro_schema_t schema_for_frame(void);

#endif /* PROTOCOL_H */
24 changes: 20 additions & 4 deletions ext/snapshot.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "io_util.h"
#include "oid2avro.h"
#include "protocol_server.h"
#include "error_policy.h"

#include <string.h>
#include "postgres.h"
Expand Down Expand Up @@ -30,6 +31,7 @@ typedef struct {
typedef struct {
MemoryContext memcontext;
export_table *tables;
error_policy_t error_policy;
int num_tables, current_table;
avro_schema_t frame_schema;
avro_value_iface_t *frame_iface;
Expand Down Expand Up @@ -103,6 +105,8 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
MemoryContext oldcontext;
export_state *state;
int ret;
text *table_pattern;
bool allow_unkeyed;
bytea *result;

oldcontext = CurrentMemoryContext;
Expand Down Expand Up @@ -135,7 +139,11 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
state->schema_cache = schema_cache_new(funcctx->multi_call_memory_ctx);
funcctx->user_fctx = state;

get_table_list(state, PG_GETARG_TEXT_P(0), PG_GETARG_BOOL(1));
table_pattern = PG_GETARG_TEXT_P(0);
allow_unkeyed = PG_GETARG_BOOL(1);
state->error_policy = parse_error_policy(TextDatumGetCString(PG_GETARG_TEXT_P(2)));

get_table_list(state, table_pattern, allow_unkeyed);
if (state->num_tables > 0) open_next_table(state);
}

Expand Down Expand Up @@ -166,7 +174,9 @@ Datum bottledwater_export(PG_FUNCTION_ARGS) {
/* don't forget to clear the SPI temp context */
SPI_freetuptable(SPI_tuptable);

SRF_RETURN_NEXT(funcctx, PointerGetDatum(result));
if (result != NULL) {
SRF_RETURN_NEXT(funcctx, PointerGetDatum(result));
}
}
}

Expand Down Expand Up @@ -332,10 +342,16 @@ bytea *format_snapshot_row(export_state *state) {
SPI_tuptable->tupdesc, SPI_tuptable->vals[0])) {
elog(INFO, "Failed tuptable: %s", schema_debug_info(table->rel, SPI_tuptable->tupdesc));
elog(INFO, "Failed relation: %s", schema_debug_info(table->rel, RelationGetDescr(table->rel)));
elog(ERROR, "bottledwater_export: Avro conversion failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "bottledwater_export: Avro conversion failed", avro_strerror());
/* if handling the error didn't exit early, it should be safe to fall
* through, because we'll just write the frame without the message that
* failed (so potentially it'll be an empty frame)
*/
}
if (try_writing(&output, &write_avro_binary, &state->frame_value)) {
elog(ERROR, "bottledwater_export: writing Avro binary failed: %s", avro_strerror());
error_policy_handle(state->error_policy, "bottledwater_export: writing Avro binary failed", avro_strerror());
/* if we didn't exit early, then output remains uninitialised */
return NULL;
}
return output;
}
Expand Down
Loading

0 comments on commit b11d925

Please sign in to comment.