Skip to content

Commit

Permalink
Merge branch 'master' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
ololobus committed Jan 27, 2021
2 parents 32c389d + fc6647b commit b984f33
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 38 deletions.
4 changes: 2 additions & 2 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "pg_wait_sampling",
"abstract": "Sampling based statistics of wait events",
"description": "pg_wait_sampling provides functions for detailed per backend and per query statistics about PostgreSQL wait events",
"version": "1.1.1",
"version": "1.1.3",
"maintainer": [
"Alexander Korotkov <[email protected]>",
"Ildus Kurbangaliev <[email protected]>"
Expand All @@ -21,7 +21,7 @@
"pg_wait_sampling": {
"file": "pg_wait_sampling--1.1.sql",
"docfile": "README.md",
"version": "1.1.1",
"version": "1.1.3",
"abstract": "Sampling based statistics of wait events"
}
},
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ $(EXTENSION)--$(EXTVERSION).sql: setup.sql
cat $^ > $@

# Prepare the package for PGXN submission
package: dist dist/$(EXTENSION)-$(EXTVERSION).zip
DISTVERSION := $(shell git tag -l | tail -n 1 | cut -d 'v' -f 2)
package: dist dist/$(EXTENSION)-$(DISTVERSION).zip

dist:
mkdir -p dist

dist/$(EXTENSION)-$(EXTVERSION).zip:
git archive --format zip --prefix=$(EXTENSION)-$(EXTVERSION)/ --output $@ master
dist/$(EXTENSION)-$(DISTVERSION).zip:
git archive --format zip --prefix=$(EXTENSION)-$(DISTVERSION)/ --output $@ HEAD
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ your bug reports.
If you're lacking of some functionality in `pg_wait_sampling` and feeling power
to implement it then you're welcome to make pull requests.

Releases
--------

New features are developed in feature-branches and then merged into [master](https://github.com/postgrespro/pg_wait_sampling/tree/master). To make a new release:

1) Bump `PGXN` version in the `META.json`.
2) Merge [master](https://github.com/postgrespro/pg_wait_sampling/tree/master) into [stable](https://github.com/postgrespro/pg_wait_sampling/tree/stable).
3) Tag new release in the [stable](https://github.com/postgrespro/pg_wait_sampling/tree/stable) with `git tag -a v1.1.X`, where the last digit is used for indicating compatible shared library changes and bugfixes. Second digit is used to indicate extension schema change, i.e. when `ALTER EXTENSION pg_wait_sampling UPDATE;` is required.
4) Merge [stable](https://github.com/postgrespro/pg_wait_sampling/tree/stable) into [debian](https://github.com/postgrespro/pg_wait_sampling/tree/debian). This separate branch is used to independently support `Debian` packaging and @anayrat with @df7cb have an access there.

Authors
-------

Expand Down
10 changes: 5 additions & 5 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ send_history(History *observations, shm_mq_handle *mqh)
{
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been detached")));
"receiver of message queue has been detached")));
return;
}
for (i = 0; i < count; i++)
Expand All @@ -238,7 +238,7 @@ send_history(History *observations, shm_mq_handle *mqh)
{
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been detached")));
"receiver of message queue has been detached")));
return;
}
}
Expand All @@ -260,7 +260,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
{
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been detached")));
"receiver of message queue has been detached")));
return;
}
hash_seq_init(&scan_status, profile_hash);
Expand All @@ -272,7 +272,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
hash_seq_term(&scan_status);
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been detached")));
"receiver of message queue has been detached")));
return;
}
}
Expand Down Expand Up @@ -468,7 +468,7 @@ collector_main(Datum main_arg)
case SHM_MQ_DETACHED:
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been "
"receiver of message queue has been "
"detached")));
break;
default:
Expand Down
90 changes: 62 additions & 28 deletions pg_wait_sampling.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,27 @@
* contrib/pg_wait_sampling/pg_wait_sampling.c
*/
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"

#include "access/htup_details.h"
#include "access/twophase.h"
#include "catalog/pg_type.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "optimizer/planner.h"
#include "pgstat.h"
#include "storage/spin.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/procarray.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/guc.h"
#include "utils/memutils.h" /* TopMemoryContext. Actually for PG 9.6 only,
* but there should be no harm for others. */

#include "pg_wait_sampling.h"

Expand All @@ -47,6 +50,11 @@ shm_mq *collector_mq = NULL;
uint64 *proc_queryids = NULL;
CollectorShmqHeader *collector_hdr = NULL;

/* Receiver (backend) local shm_mq pointers and lock */
shm_mq *recv_mq = NULL;
shm_mq_handle *recv_mqh = NULL;
LOCKTAG queueTag;

static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static PGPROC * search_proc(int backendPid);
static PlannedStmt *pgws_planner_hook(Query *parse,
Expand Down Expand Up @@ -290,6 +298,14 @@ check_shmem(void)
}
}

static void
pgws_cleanup_callback(int code, Datum arg)
{
elog(DEBUG3, "pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock");
shm_mq_detach_compat(recv_mqh, recv_mq);
LockRelease(&queueTag, ExclusiveLock, false);
}

/*
* Module load callback
*/
Expand Down Expand Up @@ -499,16 +515,14 @@ init_lock_tag(LOCKTAG *tag, uint32 lock)
static void *
receive_array(SHMRequest request, Size item_size, Size *count)
{
LOCKTAG queueTag;
LOCKTAG collectorTag;
shm_mq *mq;
shm_mq_handle *mqh;
shm_mq_result res;
Size len,
i;
void *data;
Pointer result,
ptr;
MemoryContext oldctx;

/* Ensure nobody else trying to send request to queue */
init_lock_tag(&queueTag, PGWS_QUEUE_LOCK);
Expand All @@ -519,7 +533,7 @@ receive_array(SHMRequest request, Size item_size, Size *count)
LockAcquire(&collectorTag, ExclusiveLock, false, false);
LockRelease(&collectorTag, ExclusiveLock, false);

mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE);
recv_mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE);
collector_hdr->request = request;

if (!collector_hdr->latch)
Expand All @@ -528,34 +542,54 @@ receive_array(SHMRequest request, Size item_size, Size *count)

SetLatch(collector_hdr->latch);

shm_mq_set_receiver(mq, MyProc);
mqh = shm_mq_attach(mq, NULL, NULL);
shm_mq_set_receiver(recv_mq, MyProc);

/*
* We switch to TopMemoryContext, so that recv_mqh is allocated there
* and is guaranteed to survive until before_shmem_exit callbacks are
* fired. Anyway, shm_mq_detach() will free handler on its own.
*
* NB: we do not pass `seg` to shm_mq_attach(), so it won't set its own
* callback, i.e. we do not interfere here with shm_mq_detach_callback().
*/
oldctx = MemoryContextSwitchTo(TopMemoryContext);
recv_mqh = shm_mq_attach(recv_mq, NULL, NULL);
MemoryContextSwitchTo(oldctx);

res = shm_mq_receive(mqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS || len != sizeof(*count))
/*
* Now we surely attached to the shm_mq and got collector's attention.
* If anything went wrong (e.g. Ctrl+C received from the client) we have
* to cleanup some things, i.e. detach from the shm_mq, so collector was
* able to continue responding to other requests.
*
* PG_ENSURE_ERROR_CLEANUP() guaranties that cleanup callback will be
* fired for both ERROR and FATAL.
*/
PG_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0);
{
shm_mq_detach_compat(mqh, mq);
elog(ERROR, "Error reading mq.");
}
memcpy(count, data, sizeof(*count));
res = shm_mq_receive(recv_mqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS || len != sizeof(*count))
elog(ERROR, "error reading mq");

result = palloc(item_size * (*count));
ptr = result;
memcpy(count, data, sizeof(*count));

for (i = 0; i < *count; i++)
{
res = shm_mq_receive(mqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS || len != item_size)
result = palloc(item_size * (*count));
ptr = result;

for (i = 0; i < *count; i++)
{
shm_mq_detach_compat(mqh, mq);
elog(ERROR, "Error reading mq.");
res = shm_mq_receive(recv_mqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS || len != item_size)
elog(ERROR, "error reading mq");

memcpy(ptr, data, item_size);
ptr += item_size;
}
memcpy(ptr, data, item_size);
ptr += item_size;
}
PG_END_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0);

shm_mq_detach_compat(mqh, mq);

/* We still have to detach and release lock during normal operation. */
shm_mq_detach_compat(recv_mqh, recv_mq);
LockRelease(&queueTag, ExclusiveLock, false);

return result;
Expand Down

0 comments on commit b984f33

Please sign in to comment.