Skip to content

Commit

Permalink
Update bgw job table when procedure altered
Browse files Browse the repository at this point in the history
This deals with the following modifications to the name of a procedure
used by a background job:

- If a procedure that exists in the jobs table is renamed, the
  corresponding names in the table will also be changed.

- When a procedure that is used for a background worker
  job is moved to a different schema, modify the job entry as well.

- When a schema is renamed, rename the schema name in the procedures as
  well.

- When a schema is dropped and there are procedures used by jobs, it
  will either remove the entry or throw an error, depending on whether
  `CASCADE` or `RESTRICT` is in effect.

- When a procedure or function used by a job is dropped, it will error
  out or delete the job depending on whether `CASCADE` or `RESTRICT` is
  in effect.
  • Loading branch information
mkindahl committed Nov 14, 2024
1 parent 055d4ad commit 910cbbf
Show file tree
Hide file tree
Showing 4 changed files with 455 additions and 52 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7409
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7409 Update bgw job table when altering procedure
262 changes: 251 additions & 11 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <parser/parse_type.h>
#include <parser/parse_utilcmd.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <tcop/utility.h>
#include <utils/acl.h>
#include <utils/builtins.h>
Expand All @@ -42,6 +43,7 @@
#include <utils/inval.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/regproc.h>
#include <utils/rel.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
Expand Down Expand Up @@ -472,6 +474,225 @@ process_drop_trigger_start(ProcessUtilityArgs *args, DropStmt *stmt)
ts_cache_release(hcache);
}

static DDLResult
process_drop_schema_start(DropStmt *stmt)
{
/*
* An error will be raised when we start dropping the functions used by a
* background worker, so there is no point in doing anything here.
*/
if (stmt->behavior == DROP_RESTRICT)
return DDL_CONTINUE;

/*
* Here we are relying on that if we fail to drop one of the
* procedures/functions, this transaction will be rolled back so these
* changes will not be committed.
*/
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
ListCell *cell;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool schema_isnull, job_id_isnull;
int32 job_id = DatumGetInt32(slot_getattr(ti->slot, Anum_bgw_job_id, &job_id_isnull));
Name proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &schema_isnull));
Ensure(!job_id_isnull, "corrupt job entry: job id is null");
Ensure(!schema_isnull, "corrupt job entry: schema for job %d is null", job_id);
foreach (cell, stmt->objects)
{
#if PG15_GE
String *object = lfirst_node(String, cell);
#else
Value *object = lfirst(cell);
#endif
if (namestrcmp(proc_schema, strVal(object)) == 0)
{
CatalogSecurityContext sec_ctx;
Assert(stmt->behavior == DROP_CASCADE);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ereport(NOTICE, errmsg("drop cascades to job %d", job_id));
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
}
}
}
return DDL_CONTINUE;
}

/*
* Start of dropping a procedure.
*
* We can abort the drop here by throwing an error.
*/
static void
process_drop_procedure_start(DropStmt *stmt)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
ListCell *cell;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool schema_isnull, name_isnull, job_id_isnull;
Name proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &schema_isnull));
Name proc_name = DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &name_isnull));
int32 job_id = DatumGetInt32(slot_getattr(ti->slot, Anum_bgw_job_id, &job_id_isnull));
Ensure(!job_id_isnull, "corrupt job entry: job id was null");
Ensure(!schema_isnull, "corrupt job entry: schema for job %d was null", job_id);
Ensure(!name_isnull, "corrupt job entry: name for job %d was null", job_id);
TS_DEBUG_LOG("looking at job %d with %s.%s",
job_id,
NameStr(*proc_schema),
NameStr(*proc_name));
foreach (cell, stmt->objects)
{
ObjectWithArgs *object = castNode(ObjectWithArgs, lfirst(cell));
RangeVar *rel = makeRangeVarFromNameList(object->objname);
if (namestrcmp(proc_schema, rel->schemaname) == 0 &&
namestrcmp(proc_name, rel->relname) == 0)
{
Assert(stmt->removeType == OBJECT_PROCEDURE || stmt->removeType == OBJECT_FUNCTION);
if (stmt->behavior == DROP_RESTRICT)
{
ereport(ERROR,
errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop %s because background job %d depends on it",
NameListToString(object->objname),
job_id),
errhint("Use delete_job() to drop the job first."));
}
else
{
CatalogSecurityContext sec_ctx;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ereport(NOTICE, errmsg("drop cascades to job %d", job_id));
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
}
}
}
}
}

static void
replace_attr_if_changed(AttrNumber attno, const char *newvalue, Name name_buf, Datum *values,
bool *replace)
{
if (newvalue)
{
const Name orig_value = DatumGetName(values[AttrNumberGetAttrOffset(attno)]);
if (namestrcmp(orig_value, newvalue) != 0)
{
namestrcpy(name_buf, newvalue);
values[AttrNumberGetAttrOffset(attno)] = NameGetDatum(name_buf);
replace[AttrNumberGetAttrOffset(attno)] = true;
}
}
}

/*
* Update the schema or name of a procedure in the jobs tuple.
*/
static void
ts_bgw_job_update_proc(Relation rel, HeapTuple tuple, TupleDesc tupledesc, const char *newschema,
const char *newname)
{
bool isnull[Natts_bgw_job];
Datum values[Natts_bgw_job];
bool replace[Natts_bgw_job] = { false };

/* Allocated here to make sure that they are alive at the call of
* heap_modify_tuple */
NameData proc_name_buf;
NameData proc_schema_buf;

heap_deform_tuple(tuple, tupledesc, values, isnull);

replace_attr_if_changed(Anum_bgw_job_proc_name, newname, &proc_name_buf, values, replace);
replace_attr_if_changed(Anum_bgw_job_proc_schema, newschema, &proc_schema_buf, values, replace);

HeapTuple new_tuple = heap_modify_tuple(tuple, tupledesc, values, isnull, replace);
ts_catalog_update(rel, new_tuple);
heap_freetuple(new_tuple);
}

static void
ts_bgw_job_rename_schema_name(const char *old_schema_name, const char *new_schema_name)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
if (!curr_schema_isnull && namestrcmp(curr_proc_schema, old_schema_name) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
new_schema_name,
NameStr(*curr_proc_name));

if (should_free)
heap_freetuple(tuple);
}
}
}

static DDLResult
ts_bgw_job_rename_proc(ObjectAddress address, const char *newschema, const char *newname)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
const char *old_proc_schema = get_namespace_name(get_func_namespace(address.objectId));
const char *old_proc_name = get_func_name(address.objectId);
if (!curr_schema_isnull && !curr_name_isnull &&
namestrcmp(curr_proc_name, old_proc_name) == 0 &&
namestrcmp(curr_proc_schema, old_proc_schema) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
newschema,
newname);

if (should_free)
heap_freetuple(tuple);
}
}
return DDL_CONTINUE;
}

static void
process_alterprocedureschema(ProcessUtilityArgs *args)
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) args->parsetree;
Relation relation;

Assert(stmt->objectType == OBJECT_PROCEDURE || stmt->objectType == OBJECT_FUNCTION);
ObjectAddress address =
get_object_address(stmt->objectType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, stmt->newschema, NULL);
}

/* We use this for both materialized views and views. */
static void
process_alterviewschema(ProcessUtilityArgs *args)
Expand Down Expand Up @@ -558,6 +779,10 @@ process_alterobjectschema(ProcessUtilityArgs *args)
case OBJECT_VIEW:
process_alterviewschema(args);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_alterprocedureschema(args);
break;
default:
break;
}
Expand Down Expand Up @@ -1775,6 +2000,13 @@ process_drop_start(ProcessUtilityArgs *args)
case OBJECT_TRIGGER:
process_drop_trigger_start(args, stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_drop_procedure_start(stmt);
break;
case OBJECT_SCHEMA:
process_drop_schema_start(stmt);
break;
default:
break;
}
Expand Down Expand Up @@ -2074,12 +2306,23 @@ process_rename_schema(RenameStmt *stmt)
}
}

ts_bgw_job_rename_schema_name(stmt->subname, stmt->newname);
ts_chunks_rename_schema_name(stmt->subname, stmt->newname);
ts_dimensions_rename_schema_name(stmt->subname, stmt->newname);
ts_hypertables_rename_schema_name(stmt->subname, stmt->newname);
ts_continuous_agg_rename_schema_name(stmt->subname, stmt->newname);
}

static void
process_rename_procedure(ProcessUtilityArgs *args)
{
RenameStmt *stmt = (RenameStmt *) args->parsetree;
Relation relation;
ObjectAddress address =
get_object_address(stmt->renameType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, NULL, stmt->newname);
}

static void
rename_hypertable_constraint(Hypertable *ht, Oid chunk_relid, void *arg)
{
Expand Down Expand Up @@ -2145,6 +2388,8 @@ process_rename_constraint_or_trigger(ProcessUtilityArgs *args, Cache *hcache, Oi

ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);

Assert(stmt->relation != NULL);

if (NULL != ht)
{
relation_not_only(stmt->relation);
Expand Down Expand Up @@ -2180,15 +2425,6 @@ process_rename(ProcessUtilityArgs *args)
if (!OidIsValid(relid))
return DDL_CONTINUE;
}
else
{
/*
* stmt->relation never be NULL unless we are renaming a schema or
* other objects, like foreign server
*/
if (stmt->renameType != OBJECT_SCHEMA)
return DDL_CONTINUE;
}

hcache = ts_hypertable_cache_pin();

Expand All @@ -2214,6 +2450,10 @@ process_rename(ProcessUtilityArgs *args)
case OBJECT_SCHEMA:
process_rename_schema(stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_rename_procedure(args);
break;
default:
break;
}
Expand Down Expand Up @@ -4628,7 +4868,7 @@ process_drop_table(EventTriggerDropObject *obj)
}

static void
process_drop_schema(EventTriggerDropObject *obj)
process_sql_drop_schema(EventTriggerDropObject *obj)
{
EventTriggerDropSchema *schema = (EventTriggerDropSchema *) obj;
int count;
Expand Down Expand Up @@ -4695,7 +4935,7 @@ process_ddl_sql_drop(EventTriggerDropObject *obj)
process_drop_table(obj);
break;
case EVENT_TRIGGER_DROP_SCHEMA:
process_drop_schema(obj);
process_sql_drop_schema(obj);
break;
case EVENT_TRIGGER_DROP_TRIGGER:
process_drop_trigger(obj);
Expand Down
Loading

0 comments on commit 910cbbf

Please sign in to comment.