Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
HAWQ-1734. Resolve insert issue in external table of orc
Browse files Browse the repository at this point in the history
  • Loading branch information
oushu1tuyu1 authored and huor committed Aug 2, 2019
1 parent 30afd65 commit 33ddcf7
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 24 deletions.
81 changes: 81 additions & 0 deletions src/backend/access/appendonly/appendonlywriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,72 @@ List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num,

}

List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num,
bool forNewRel, bool reuse_segfilenum_in_same_xid,
bool keepHash)
{
/* these vars are used in GP_ROLE_DISPATCH only */
AORelHashEntryData *aoentry = NULL;
TransactionId CurrentXid = GetTopTransactionId();
int next;
AOSegfileStatus *segfilestatus = NULL;
int remaining_num = segment_num;
bool has_same_txn_status = false;
AOSegfileStatus **maxSegno4Segment = NULL;

switch(Gp_role)
{
case GP_ROLE_EXECUTE:

Assert(existing_segnos != NIL);
Assert(list_length(existing_segnos) == segment_num);
return existing_segnos;

case GP_ROLE_UTILITY:

Assert(existing_segnos == NIL);
Assert(segment_num == 1);
return list_make1_int(RESERVED_SEGNO);

case GP_ROLE_DISPATCH:

Assert(existing_segnos == NIL);
Assert(segment_num > 0);

if (forNewRel)
{
int i;
for (i = 1; i <= segment_num; i++)
{
existing_segnos = lappend_int(existing_segnos, i);
}
return existing_segnos;
}

if (Debug_appendonly_print_segfile_choice)
{
ereport(LOG, (errmsg("SetSegnoForWrite: Choosing a segno for external "
"relation \"%s\" (%d) ",
get_rel_name(relid), relid)));
}

for (int i = 0; i < segment_num; i++)
{
existing_segnos = lappend_int(existing_segnos, i);
}
Assert(list_length(existing_segnos) == segment_num);

return existing_segnos;

/* fix this for dispatch agent. for now it's broken anyway. */
default:
Assert(false);
return NIL;
}

}


/*
* assignPerRelSegno
*
Expand Down Expand Up @@ -1231,6 +1297,21 @@ List *assignPerRelSegno(List *all_relids, int segment_num)
}

}
else if (RelationIsExternal(rel))
{
SegfileMapNode *n;

n = makeNode(SegfileMapNode);
n->relid = cur_relid;

n->segnos = SetSegnoForExternalWrite(NIL, cur_relid, segment_num,
false, true, true);

Assert(n->relid != InvalidOid);
Assert(n->segnos != NIL);

mapping = lappend(mapping, n);
}

/*
* hold RowExclusiveLock until the end of transaction
Expand Down
33 changes: 26 additions & 7 deletions src/backend/cdb/cdbdatalocality.c
Original file line number Diff line number Diff line change
Expand Up @@ -4125,15 +4125,34 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query
int fileCountInRelation = list_length(rel_data->files);
bool FileCountBucketNumMismatch = false;
if (targetPolicy->bucketnum > 0) {
FileCountBucketNumMismatch = fileCountInRelation %
targetPolicy->bucketnum == 0 ? false : true;
Relation rel = heap_open(rel_data->relid, NoLock);
targetPolicy->bucketnum == 0 ? false : true;
if (!RelationIsExternal(rel))
{
FileCountBucketNumMismatch = fileCountInRelation %
targetPolicy->bucketnum == 0 ? false : true;
}
else
{
ListCell *lc_file;
int maxsegno = 0;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
if (rel_file->segno > maxsegno)
maxsegno = rel_file->segno;
}
FileCountBucketNumMismatch =
maxsegno > targetPolicy->bucketnum ? true : false;
}
heap_close(rel, NoLock);
}
if (isRelationHash && FileCountBucketNumMismatch && !allow_file_count_bucket_num_mismatch) {
elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
"number %d of hash table with oid=%u, some data may be lost, if you "
"still want to continue the query by considering the table as random, set GUC "
"allow_file_count_bucket_num_mismatch to on and try again.",
fileCountInRelation, targetPolicy->bucketnum, myrelid);
elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
"number %d of hash table with oid=%u, some data may be lost, if you "
"still want to continue the query by considering the table as random, set GUC "
"allow_file_count_bucket_num_mismatch to on and try again.",
fileCountInRelation, targetPolicy->bucketnum, myrelid);
}
/* change the virtual segment order when keep hash.
* order of idMap should also be changed.
Expand Down
26 changes: 20 additions & 6 deletions src/backend/cdb/cdbquerycontextdispatching.c
Original file line number Diff line number Diff line change
Expand Up @@ -2955,10 +2955,6 @@ fetchSegFileInfos(Oid relid, List *segnos)
}

storageChar = get_relation_storage_type(relid);
/*
* Get pg_appendonly information for this table.
*/
aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);

/*
* Based on the pg_appendonly information, fetch
Expand All @@ -2967,13 +2963,31 @@ fetchSegFileInfos(Oid relid, List *segnos)
*/
if (RELSTORAGE_AOROWS == storageChar)
{
/*
* Get pg_appendonly information for this table.
*/
aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
AOFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
else
else if (RELSTORAGE_PARQUET == storageChar)
{
Assert(RELSTORAGE_PARQUET == storageChar);
/*
* Get pg_appendonly information for this table.
*/
aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
else
{
/*
* Get range info for current magma hash table, which is already
* available when get block location and do datalocality
*/
/*
* TODO(Zongtian): vsegno -> rg list -> {range list1, ..., range list N}
*/
Assert(RELSTORAGE_EXTERNAL == storageChar);
}
}
return result;
}
Expand Down
4 changes: 4 additions & 0 deletions src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,9 @@ CopyFrom(CopyState cstate)
resultRelInfo->ri_aosegfileinfos,
GetQEIndex());

PlannedStmt* plannedstmt = palloc(sizeof(PlannedStmt));
memset(plannedstmt,0,sizeof(PlannedStmt));
plannedstmt->scantable_splits = cstate->splits;
resultRelInfo->ri_extInsertDesc =
InvokePlugStorageFormatInsertInit(insertInitFunc,
resultRelInfo->ri_RelationDesc,
Expand All @@ -4340,6 +4343,7 @@ CopyFrom(CopyState cstate)
segfileinfo->segno);

pfree(insertInitFunc);
pfree(plannedstmt);
}
else
{
Expand Down
68 changes: 57 additions & 11 deletions src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,9 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
result_segfileinfos = GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc),
estate->es_result_aosegnos, result_segfileinfos);
}

plannedstmt->result_segfileinfos = result_segfileinfos;

if (plannedstmt->intoClause != NULL)
{
List *segment_segnos = SetSegnoForWrite(NIL, 0, GetQEGangNum(), true, true, false);
Expand Down Expand Up @@ -1791,7 +1792,6 @@ InitializeResultRelations(PlannedStmt *plannedstmt, EState *estate, CmdType oper
}

}

estate->es_partition_state = NULL;
if (estate->es_result_partitions)
{
Expand Down Expand Up @@ -2432,7 +2432,7 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
Relation rel = heap_open(relid, AccessShareLock);

/* only relevant for AO relations */
if(!RelationIsAoRows(rel) && !RelationIsParquet(rel))
if(!RelationIsAoRows(rel) && !RelationIsParquet(rel) && !RelationIsExternal(rel))
{
heap_close(rel, AccessShareLock);
return;
Expand Down Expand Up @@ -2466,6 +2466,45 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
Assert(found);
}

static void CreateExternalSegFileForRelationOnMaster(Relation rel, List *segnos,
SharedStorageOpTasks *addTasks)
{
ParquetFileSegInfo * fsinfo;
ListCell *cell;

Assert(RelationIsExternal(rel));

char * relname = RelationGetRelationName(rel);

foreach(cell, segnos)
{
int segno = lfirst_int(cell);

Assert(NULL != addTasks);
Assert(addTasks->sizeTasks >= addTasks->numTasks);

RelFileNode *n;

if (addTasks->sizeTasks == addTasks->numTasks)
{
addTasks->tasks = repalloc(addTasks->tasks,
addTasks->sizeTasks * sizeof(SharedStorageOpTask) * 2);
addTasks->sizeTasks *= 2;
}

n = &addTasks->tasks[addTasks->numTasks].node;
n->dbNode = rel->rd_node.dbNode;
n->relNode = rel->rd_node.relNode;
n->spcNode = rel->rd_node.spcNode;

addTasks->tasks[addTasks->numTasks].segno = segno;
addTasks->tasks[addTasks->numTasks].relname = palloc(strlen(relname) + 1);
strcpy(addTasks->tasks[addTasks->numTasks].relname, relname);

addTasks->numTasks++;
}
}

static void
CreaateAoRowSegFileForRelationOnMaster(Relation rel,
AppendOnlyEntry * aoEntry, List *segnos, SharedStorageOpTasks *addTask, SharedStorageOpTasks *overwriteTask)
Expand Down Expand Up @@ -2647,13 +2686,20 @@ CreateAppendOnlyParquetSegFileForRelationOnMaster(Relation rel, List *segnos)
CreateParquetSegFileForRelationOnMaster(rel, aoEntry, segnos, addTasks, overwriteTasks);

pfree(aoEntry);

PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
PostPerformSharedStorageOpTasks(addTasks);
PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
}
// TODO: Should we create empty files on orc hash distribution table?
// else if (RelationIsExternal(rel))
// {
// CreateExternalSegFileForRelationOnMaster(rel, segnos, addTasks);
// PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
// }

PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
PostPerformSharedStorageOpTasks(addTasks);
PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
DropSharedStorageOpTasks(addTasks);
DropSharedStorageOpTasks(overwriteTasks);
DropSharedStorageOpTasks(overwriteTasks);
}

/*
Expand Down Expand Up @@ -2712,10 +2758,10 @@ ResultRelInfoSetSegFileInfo(ResultRelInfo *resultRelInfo, List *mapping)
/*
* Only relevant for AO relations.
*/
if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
{
return;
}
// if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
// {
// return;
// }

Assert(mapping);
Assert(resultRelInfo->ri_RelationDesc);
Expand Down
1 change: 1 addition & 0 deletions src/include/access/appendonlywriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ extern void InitAppendOnlyWriter(void);
extern Size AppendOnlyWriterShmemSize(void);
extern bool TestCurrentTspSupportTruncate(Oid tsp);
extern List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
extern List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
extern List *assignPerRelSegno(List *all_rels, int segment_num);
extern void UpdateMasterAosegTotals(Relation parentrel,
int segno,
Expand Down

0 comments on commit 33ddcf7

Please sign in to comment.