Skip to content

Commit

Permalink
(testing) prototype for async ptrack map file reading #2
Browse files Browse the repository at this point in the history
  • Loading branch information
kulaginm committed Feb 13, 2022
1 parent fa170d7 commit 799c4c4
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

engine.o: engine.h

139 changes: 137 additions & 2 deletions engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ ptrackCleanFiles(void)
#endif

#ifndef PTRACK_USE_AIO
#ifndef PTRACK_READ_CHUNK
static bool
ptrackMapReadFromFileSync(const char *ptrack_path)
{
instr_time func_start, func_end, crc_start, func_time, read_time, crc_time;

elog(LOG, "ptrack read map (sync version): start, ptrack_actual_size %zu bytes", PtrackActualSize);
elog(LOG, "ptrack read map (sync greedy version): start, ptrack_actual_size %zu bytes", PtrackActualSize);
INSTR_TIME_SET_CURRENT(func_start);

/* Do actual file read */
Expand Down Expand Up @@ -238,10 +239,144 @@ ptrackMapReadFromFileSync(const char *ptrack_path)
INSTR_TIME_ACCUM_DIFF(read_time, crc_start, func_start);
INSTR_TIME_SET_ZERO(crc_time);
INSTR_TIME_ACCUM_DIFF(crc_time, func_end, crc_start);
elog(LOG, "ptrack read map (sync version): end. Timings (microseconds): file io time = %lu, crc time = %lu, overall time = %lu",
elog(LOG, "ptrack read map (sync greedy version): end. Timings (microseconds): file io time = %lu, crc time = %lu, overall time = %lu",
INSTR_TIME_GET_MICROSEC(read_time), INSTR_TIME_GET_MICROSEC(crc_time), INSTR_TIME_GET_MICROSEC(func_time));
return true;
}
#else
static bool
ptrackMapReadFromFileSync(const char *ptrack_path)
{
instr_time func_start, func_end, func_time,
read_start, read_end, read_time,
crc_start, crc_end, crc_time;
int ptrack_fd;
size_t readed = 0;
size_t summed = 0;
pg_crc32c crc;
pg_crc32c *file_crc;

elog(LOG, "ptrack read map (sync non-greedy version): start, ptrack_actual_size %zu bytes, PTRACK_READ_CHUNK %zu bytes",
PtrackActualSize, (size_t) PTRACK_READ_CHUNK);
INSTR_TIME_SET_ZERO(func_time);
INSTR_TIME_SET_ZERO(read_time);
INSTR_TIME_SET_ZERO(crc_time);
INSTR_TIME_SET_CURRENT(func_start);

INSTR_TIME_SET_CURRENT(crc_start);
INIT_CRC32C(crc);
file_crc = (pg_crc32c *) ((char *) ptrack_map + PtrackCrcOffset);
INSTR_TIME_SET_CURRENT(crc_end);
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);

INSTR_TIME_SET_CURRENT(read_start);
ptrack_fd = BasicOpenFile(ptrack_path, O_RDWR | PG_BINARY);
if (ptrack_fd < 0)
elog(ERROR, "ptrack read map: failed to open map file \"%s\": %m", ptrack_path);
INSTR_TIME_SET_CURRENT(read_end);
INSTR_TIME_ACCUM_DIFF(read_time, read_end, read_start);

do
{
ssize_t last_readed;

INSTR_TIME_SET_CURRENT(read_start);
last_readed = read(ptrack_fd, (char *) ptrack_map + readed, Min(PTRACK_READ_CHUNK, PtrackActualSize - readed));

if (last_readed > 0)
{
elog(DEBUG1, "ptrack read map: read: offset = %zu, nbytes = %zu, readed = %zi",
readed, Min((size_t) PTRACK_READ_CHUNK, PtrackActualSize - readed), last_readed);
readed += last_readed;
}
else if (last_readed == 0)
{
/*
* We don't try to read more that PtrackActualSize and
* file size was already checked in ptrackMapInit()
*/
elog(ERROR, "ptrack read map: unexpected end of file while reading map file \"%s\", expected to read %zu, but read only %zu bytes",
ptrack_path, PtrackActualSize, readed);
}
else if (last_readed < 0 && errno != EINTR)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("ptrack read map: could not read map file \"%s\": %m", ptrack_path)));
close(ptrack_fd);
return false;
}
INSTR_TIME_SET_CURRENT(read_end);
INSTR_TIME_ACCUM_DIFF(read_time, read_end, read_start);

if (last_readed > 0)
{
size_t sum_chunk_size = Min(last_readed, PtrackCrcOffset - summed);
INSTR_TIME_SET_CURRENT(crc_start);
elog(DEBUG1, "ptrack read map: COMP_CRC32C: offset = %zu, nbytes = %zu",
summed, sum_chunk_size);
COMP_CRC32C(crc, (char *) ptrack_map + summed, sum_chunk_size);
summed += sum_chunk_size;
INSTR_TIME_SET_CURRENT(crc_end);
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);
}
} while (readed < PtrackActualSize);

INSTR_TIME_SET_CURRENT(read_start);
close(ptrack_fd);
INSTR_TIME_SET_CURRENT(read_end);
INSTR_TIME_ACCUM_DIFF(read_time, read_end, read_start);


INSTR_TIME_SET_CURRENT(crc_start);
/* Check PTRACK_MAGIC */
if (strcmp(ptrack_map->magic, PTRACK_MAGIC) != 0)
{
elog(WARNING, "ptrack read map: wrong map format of file \"%s\"", ptrack_path);
return false;
}

/* Check ptrack version inside old ptrack map */
if (ptrack_map->version_num != PTRACK_MAP_FILE_VERSION_NUM)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("ptrack read map: map format version %d in the file \"%s\" is incompatible with file format of extension %d",
ptrack_map->version_num, ptrack_path, PTRACK_MAP_FILE_VERSION_NUM),
errdetail("Deleting file \"%s\" and reinitializing ptrack map.", ptrack_path)));
return false;
}

/* Check CRC */
{
FIN_CRC32C(crc);

/*
* Read ptrack map values without atomics during initialization, since
* postmaster is the only user right now.
*/
elog(DEBUG1, "ptrack read map: crc %u, file_crc %u, init_lsn %X/%X",
crc, *file_crc, (uint32) (ptrack_map->init_lsn.value >> 32), (uint32) ptrack_map->init_lsn.value);

if (!EQ_CRC32C(*file_crc, crc))
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("ptrack read map: incorrect checksum of file \"%s\"", ptrack_path),
errdetail("Deleting file \"%s\" and reinitializing ptrack map.", ptrack_path)));
return false;
}
}
INSTR_TIME_SET_CURRENT(crc_end);
INSTR_TIME_ACCUM_DIFF(crc_time, crc_end, crc_start);

INSTR_TIME_SET_CURRENT(func_end);
INSTR_TIME_ACCUM_DIFF(func_time, func_end, func_start);
elog(LOG, "ptrack read map (sync non-greedy version): end. Timings (microseconds): file io time = %lu, crc time = %lu, overall time = %lu",
INSTR_TIME_GET_MICROSEC(read_time), INSTR_TIME_GET_MICROSEC(crc_time), INSTR_TIME_GET_MICROSEC(func_time));
return true;
}
#endif
#endif

#ifdef PTRACK_USE_AIO
Expand Down
7 changes: 5 additions & 2 deletions engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ typedef PtrackMapHdr * PtrackMap;
/* async io section*/

/* use aio ptrack map read if defined */
#define PTRACK_USE_AIO
//#define PTRACK_USE_AIO

/* size of one async read operation (bytes) */
#define PTRACK_AIO_READ_CHUNK 1024*1024
#define PTRACK_AIO_READ_CHUNK 2*1024*1024

/* size of one sync read operation (bytes), or try to read whole file if undefined */
#define PTRACK_READ_CHUNK 1024*1024

/* maximum count of pending aio read operations */
#define PTRACK_AIO_READ_QUEUE_DEPTH 4
Expand Down

0 comments on commit 799c4c4

Please sign in to comment.