From 33ddcf7aa87eaa47063496319681627158d76de9 Mon Sep 17 00:00:00 2001 From: tuyu Date: Thu, 1 Aug 2019 15:29:47 +0800 Subject: [PATCH] HAWQ-1734. Resolve insert issue in external table of orc --- .../access/appendonly/appendonlywriter.c | 81 +++++++++++++++++++ src/backend/cdb/cdbdatalocality.c | 33 ++++++-- src/backend/cdb/cdbquerycontextdispatching.c | 26 ++++-- src/backend/commands/copy.c | 4 + src/backend/executor/execMain.c | 68 +++++++++++++--- src/include/access/appendonlywriter.h | 1 + 6 files changed, 189 insertions(+), 24 deletions(-) diff --git a/src/backend/access/appendonly/appendonlywriter.c b/src/backend/access/appendonly/appendonlywriter.c index addd33e44d..629ba3a718 100644 --- a/src/backend/access/appendonly/appendonlywriter.c +++ b/src/backend/access/appendonly/appendonlywriter.c @@ -1185,6 +1185,72 @@ List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num, } +List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num, + bool forNewRel, bool reuse_segfilenum_in_same_xid, + bool keepHash) +{ + /* these vars are used in GP_ROLE_DISPATCH only */ + AORelHashEntryData *aoentry = NULL; + TransactionId CurrentXid = GetTopTransactionId(); + int next; + AOSegfileStatus *segfilestatus = NULL; + int remaining_num = segment_num; + bool has_same_txn_status = false; + AOSegfileStatus **maxSegno4Segment = NULL; + + switch(Gp_role) + { + case GP_ROLE_EXECUTE: + + Assert(existing_segnos != NIL); + Assert(list_length(existing_segnos) == segment_num); + return existing_segnos; + + case GP_ROLE_UTILITY: + + Assert(existing_segnos == NIL); + Assert(segment_num == 1); + return list_make1_int(RESERVED_SEGNO); + + case GP_ROLE_DISPATCH: + + Assert(existing_segnos == NIL); + Assert(segment_num > 0); + + if (forNewRel) + { + int i; + for (i = 1; i <= segment_num; i++) + { + existing_segnos = lappend_int(existing_segnos, i); + } + return existing_segnos; + } + + if (Debug_appendonly_print_segfile_choice) + { + ereport(LOG, (errmsg("SetSegnoForWrite: Choosing a segno for external " + "relation \"%s\" (%d) ", + get_rel_name(relid), relid))); + } + + for (int i = 0; i < segment_num; i++) + { + existing_segnos = lappend_int(existing_segnos, i); + } + Assert(list_length(existing_segnos) == segment_num); + + return existing_segnos; + + /* fix this for dispatch agent. for now it's broken anyway. */ + default: + Assert(false); + return NIL; + } + + } + + /* * assignPerRelSegno * @@ -1231,6 +1297,21 @@ List *assignPerRelSegno(List *all_relids, int segment_num) } } + else if (RelationIsExternal(rel)) + { + SegfileMapNode *n; + + n = makeNode(SegfileMapNode); + n->relid = cur_relid; + + n->segnos = SetSegnoForExternalWrite(NIL, cur_relid, segment_num, + false, true, true); + + Assert(n->relid != InvalidOid); + Assert(n->segnos != NIL); + + mapping = lappend(mapping, n); + } /* * hold RowExclusiveLock until the end of transaction diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c index 55f4ac0952..67fe51d20b 100644 --- a/src/backend/cdb/cdbdatalocality.c +++ b/src/backend/cdb/cdbdatalocality.c @@ -4125,15 +4125,34 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query int fileCountInRelation = list_length(rel_data->files); bool FileCountBucketNumMismatch = false; if (targetPolicy->bucketnum > 0) { - FileCountBucketNumMismatch = fileCountInRelation % - targetPolicy->bucketnum == 0 ? false : true; + Relation rel = heap_open(rel_data->relid, NoLock); + targetPolicy->bucketnum == 0 ? false : true; + if (!RelationIsExternal(rel)) + { + FileCountBucketNumMismatch = fileCountInRelation % + targetPolicy->bucketnum == 0 ? false : true; + } + else + { + ListCell *lc_file; + int maxsegno = 0; + foreach(lc_file, rel_data->files) + { + Relation_File *rel_file = (Relation_File *) lfirst(lc_file); + if (rel_file->segno > maxsegno) + maxsegno = rel_file->segno; + } + FileCountBucketNumMismatch = + maxsegno > targetPolicy->bucketnum ? true : false; + } + heap_close(rel, NoLock); } if (isRelationHash && FileCountBucketNumMismatch && !allow_file_count_bucket_num_mismatch) { - elog(ERROR, "file count %d in catalog is not in proportion to the bucket " - "number %d of hash table with oid=%u, some data may be lost, if you " - "still want to continue the query by considering the table as random, set GUC " - "allow_file_count_bucket_num_mismatch to on and try again.", - fileCountInRelation, targetPolicy->bucketnum, myrelid); + elog(ERROR, "file count %d in catalog is not in proportion to the bucket " + "number %d of hash table with oid=%u, some data may be lost, if you " + "still want to continue the query by considering the table as random, set GUC " + "allow_file_count_bucket_num_mismatch to on and try again.", + fileCountInRelation, targetPolicy->bucketnum, myrelid); } /* change the virtual segment order when keep hash. * order of idMap should also be changed. diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c index 88d4f44957..167af2af97 100644 --- a/src/backend/cdb/cdbquerycontextdispatching.c +++ b/src/backend/cdb/cdbquerycontextdispatching.c @@ -2955,10 +2955,6 @@ fetchSegFileInfos(Oid relid, List *segnos) } storageChar = get_relation_storage_type(relid); - /* - * Get pg_appendonly information for this table. - */ - aoEntry = GetAppendOnlyEntry(relid, SnapshotNow); /* * Based on the pg_appendonly information, fetch @@ -2967,13 +2963,31 @@ fetchSegFileInfos(Oid relid, List *segnos) */ if (RELSTORAGE_AOROWS == storageChar) { + /* + * Get pg_appendonly information for this table. + */ + aoEntry = GetAppendOnlyEntry(relid, SnapshotNow); AOFetchSegFileInfo(aoEntry, result, SnapshotNow); } - else + else if (RELSTORAGE_PARQUET == storageChar) { - Assert(RELSTORAGE_PARQUET == storageChar); + /* + * Get pg_appendonly information for this table. + */ + aoEntry = GetAppendOnlyEntry(relid, SnapshotNow); ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow); } + else + { + /* + * Get range info for current magma hash table, which is already + * available when get block location and do datalocality + */ + /* + * TODO(Zongtian): vsegno -> rg list -> {range list1, ..., range list N} + */ + Assert(RELSTORAGE_EXTERNAL == storageChar); + } } return result; } diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9c2114886f..dbeda093b9 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -4331,6 +4331,9 @@ CopyFrom(CopyState cstate) resultRelInfo->ri_aosegfileinfos, GetQEIndex()); + PlannedStmt* plannedstmt = palloc(sizeof(PlannedStmt)); + memset(plannedstmt,0,sizeof(PlannedStmt)); + plannedstmt->scantable_splits = cstate->splits; resultRelInfo->ri_extInsertDesc = InvokePlugStorageFormatInsertInit(insertInitFunc, resultRelInfo->ri_RelationDesc, @@ -4340,6 +4343,7 @@ CopyFrom(CopyState cstate) segfileinfo->segno); pfree(insertInitFunc); + pfree(plannedstmt); } else { diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 81f68f2e59..e21aeb6d48 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -875,8 +875,9 @@ ExecutorStart(QueryDesc *queryDesc, int eflags) result_segfileinfos = GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc), estate->es_result_aosegnos, result_segfileinfos); } + plannedstmt->result_segfileinfos = result_segfileinfos; - + if (plannedstmt->intoClause != NULL) { List *segment_segnos = SetSegnoForWrite(NIL, 0, GetQEGangNum(), true, true, false); @@ -1791,7 +1792,6 @@ InitializeResultRelations(PlannedStmt *plannedstmt, EState *estate, CmdType oper } } - estate->es_partition_state = NULL; if (estate->es_result_partitions) { @@ -2432,7 +2432,7 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping) Relation rel = heap_open(relid, AccessShareLock); /* only relevant for AO relations */ - if(!RelationIsAoRows(rel) && !RelationIsParquet(rel)) + if(!RelationIsAoRows(rel) && !RelationIsParquet(rel) && !RelationIsExternal(rel)) { heap_close(rel, AccessShareLock); return; @@ -2466,6 +2466,45 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping) Assert(found); } +static void CreateExternalSegFileForRelationOnMaster(Relation rel, List *segnos, + SharedStorageOpTasks *addTasks) +{ + ParquetFileSegInfo * fsinfo; + ListCell *cell; + + Assert(RelationIsExternal(rel)); + + char * relname = RelationGetRelationName(rel); + + foreach(cell, segnos) + { + int segno = lfirst_int(cell); + + Assert(NULL != addTasks); + Assert(addTasks->sizeTasks >= addTasks->numTasks); + + RelFileNode *n; + + if (addTasks->sizeTasks == addTasks->numTasks) + { + addTasks->tasks = repalloc(addTasks->tasks, + addTasks->sizeTasks * sizeof(SharedStorageOpTask) * 2); + addTasks->sizeTasks *= 2; + } + + n = &addTasks->tasks[addTasks->numTasks].node; + n->dbNode = rel->rd_node.dbNode; + n->relNode = rel->rd_node.relNode; + n->spcNode = rel->rd_node.spcNode; + + addTasks->tasks[addTasks->numTasks].segno = segno; + addTasks->tasks[addTasks->numTasks].relname = palloc(strlen(relname) + 1); + strcpy(addTasks->tasks[addTasks->numTasks].relname, relname); + + addTasks->numTasks++; + } +} + static void CreaateAoRowSegFileForRelationOnMaster(Relation rel, AppendOnlyEntry * aoEntry, List *segnos, SharedStorageOpTasks *addTask, SharedStorageOpTasks *overwriteTask) @@ -2647,13 +2686,20 @@ CreateAppendOnlyParquetSegFileForRelationOnMaster(Relation rel, List *segnos) CreateParquetSegFileForRelationOnMaster(rel, aoEntry, segnos, addTasks, overwriteTasks); pfree(aoEntry); + + PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile); + PostPerformSharedStorageOpTasks(addTasks); + PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile); } + // TODO: Should we create empty files on orc hash distribution table? + // else if (RelationIsExternal(rel)) + // { + // CreateExternalSegFileForRelationOnMaster(rel, segnos, addTasks); + // PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile); + // } - PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile); - PostPerformSharedStorageOpTasks(addTasks); - PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile); DropSharedStorageOpTasks(addTasks); - DropSharedStorageOpTasks(overwriteTasks); + DropSharedStorageOpTasks(overwriteTasks); } /* @@ -2712,10 +2758,10 @@ ResultRelInfoSetSegFileInfo(ResultRelInfo *resultRelInfo, List *mapping) /* * Only relevant for AO relations. */ - if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) - { - return; - } +// if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) +// { +// return; +// } Assert(mapping); Assert(resultRelInfo->ri_RelationDesc); diff --git a/src/include/access/appendonlywriter.h b/src/include/access/appendonlywriter.h index a943bcb577..38d93541eb 100755 --- a/src/include/access/appendonlywriter.h +++ b/src/include/access/appendonlywriter.h @@ -129,6 +129,7 @@ extern void InitAppendOnlyWriter(void); extern Size AppendOnlyWriterShmemSize(void); extern bool TestCurrentTspSupportTruncate(Oid tsp); extern List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash); +extern List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash); extern List *assignPerRelSegno(List *all_rels, int segment_num); extern void UpdateMasterAosegTotals(Relation parentrel, int segno,