Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
HAWQ-1811. Sync with OushuDB - Phase IV
Browse files Browse the repository at this point in the history
  • Loading branch information
ztao1987 committed Mar 16, 2022
1 parent 1aa69cf commit e0a3899
Show file tree
Hide file tree
Showing 124 changed files with 7,811 additions and 21,680 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ include_directories(${CMAKE_SOURCE_DIR}/hornet/univplan/src)
include_directories(${CMAKE_SOURCE_DIR}/hornet/magma/src)
include_directories(${CMAKE_SOURCE_DIR}/hornet/storage/src)
include_directories(${CMAKE_SOURCE_DIR}/hornet/executor/src)
include_directories(${CMAKE_SOURCE_DIR}/hornet/scheduler/src)
include_directories(${CMAKE_BINARY_DIR}/hornet/dbcommon/src)
include_directories(/opt/dependency/package/include)

Expand All @@ -124,7 +123,7 @@ add_dependencies(dxltranslators config)
add_executable(postgres ${cdb_source} ${generate_source})
target_link_libraries(postgres z bz2 lz4 snappy xml2 curl ldap json-c krb5 yarn thrift) # basic
target_link_libraries(postgres gpos xerces-c naucrates gpdbcost gpopt dxltranslators) # gp-orca
target_link_libraries(postgres hdfs3 dbcommon-shared univplan-shared storage-shared magma-client-shared executor-shared scheduler-shared) # hornet
target_link_libraries(postgres hdfs3 dbcommon-shared univplan-shared storage-shared magma-client-shared storage-magma-format-shared executor-shared) # hornet
target_link_libraries(postgres dl)
add_dependencies(postgres config)

Expand Down
131 changes: 131 additions & 0 deletions contrib/hornet/hornet.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@

#include "postgres.h"

#include <inttypes.h>

#include "funcapi.h"

#include "access/fileam.h"
#include "access/filesplit.h"
#include "access/orcam.h"
#include "catalog/pg_exttable.h"
#include "hdfs/hdfs.h"
#include "storage/cwrapper/orc-format-c.h"
#include "storage/fd.h"
#include "storage/filesystem.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/hawq_funcoid_mapping.h"
#include "utils/lsyscache.h"

Datum ls_hdfs_dir(PG_FUNCTION_ARGS);

Expand Down Expand Up @@ -133,9 +142,131 @@ Datum ls_hdfs_dir(PG_FUNCTION_ARGS) {
}
}

Datum is_supported_proc_in_NewQE(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(is_supported_proc_in_NewQE);
Datum is_supported_proc_in_NewQE(PG_FUNCTION_ARGS) {
Oid a = PG_GETARG_OID(0);
int32_t mappingFuncId = HAWQ_FUNCOID_MAPPING(a);
PG_RETURN_BOOL(!(IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)));
}

Datum orc_tid_scan(FunctionCallInfo fcinfo, int segno, const char *url,
uint64_t tid) {
Assert(segno == 0 || url == NULL);

// Argument checking
Oid argtype = get_fn_expr_argtype(fcinfo->flinfo, 0);
if (!type_is_rowtype(argtype))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("first argument of %s must be a row type", __func__)));
Oid relId = get_typ_typrelid(argtype);
char relStorage = get_rel_relstorage(relId);
if (relstorage_is_external(relStorage)) {
ExtTableEntry *extEntry = GetExtTableEntry(relId);
const char *fmtName = getExtTblFormatterTypeInFmtOptsStr(extEntry->fmtopts);
if (strcasecmp("orc", fmtName) != 0) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Invalid external table type of %s for ORC Table.",
fmtName)));
}
if (segno > 0) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Expecting URL for external ORC Table.", fmtName)));
}
} else if (RELSTORAGE_ORC != relStorage) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Invalid table type of '%c' for ORC Table.",
get_rel_relstorage(relId))));
}

// Retrieve output tuple description
TupleDesc tupdesc;
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
TupleTableSlot *slot = TupleDescGetSlot(tupdesc);

// Setup projection
bool *proj = (bool *)palloc(tupdesc->natts * sizeof(bool));
if (fcinfo->nargs == 4) { // specify the attribute to project
ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
size_t num = (ARR_SIZE(arr) - ARR_DATA_OFFSET(arr)) / sizeof(int32_t);
int32_t *attrNums = (int32_t *)ARR_DATA_PTR(arr);
memset(proj, 0, tupdesc->natts);
for (size_t i = 0; i < num; i++) {
if (attrNums[i] <= 0 || attrNums[i] > tupdesc->natts)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Invalid attribute number of %" PRId32 ".",
attrNums[i])));
proj[attrNums[i] - 1] = 1;
}
} else { // scan the whole tuple
memset(proj, 1, tupdesc->natts);
}

// Construct file splits
FileSplit split = makeNode(FileSplitNode);
split->segno = segno;
if (segno == 0) split->ext_file_uri_string = (char *)url;
split->offsets = 0;
split->lengths = INT64_MAX;
split->logiceof = INT64_MAX;
List *fileSplits = list_make1(split);

Relation rel = RelationIdGetRelation(relId);
OrcScanDescData *scanDesc =
orcBeginReadWithOptionsStr(rel, ActiveSnapshot, NULL, fileSplits, proj,
NULL, "{\"format\": \"APACHE_ORC_FORMAT\"}");
RelationClose(rel);

// XXX(chiyang): hack way to directly get `ORCFormatC *fmt;`, which is defined
// inside orcam.c.
bool scanSucceed =
ORCFormatTidScanC(*(ORCFormatC **)scanDesc->orcFormatData, tid);
checkOrcError(scanDesc->orcFormatData);
if (scanSucceed) {
orcReadNext(scanDesc, slot);

// Materialize the tuple
Datum *values = slot_get_values(slot);
bool *nulls = slot_get_isnull(slot);
for (size_t idx = 0; idx < tupdesc->natts; idx++) {
if (!nulls[idx]) {
values[idx] = datumCopy(values[idx], tupdesc->attrs[idx]->attbyval,
tupdesc->attrs[idx]->attlen);
}
}
} else {
orcEndRead(scanDesc);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("TID %" PRIu64 " exceeds file tuple count.", tid)));
}
orcEndRead(scanDesc);

HeapTuple retTuple =
heap_form_tuple(tupdesc, slot_get_values(slot), slot_get_isnull(slot));

PG_RETURN_DATUM(HeapTupleGetDatum(retTuple));
}

Datum orc_segno_tid_scan(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(orc_segno_tid_scan);
Datum orc_segno_tid_scan(PG_FUNCTION_ARGS) {
int segno = PG_GETARG_INT32(1);
uint64_t tid = PG_GETARG_INT64(2);

return orc_tid_scan(fcinfo, segno, NULL, tid);
}

Datum orc_url_tid_scan(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(orc_url_tid_scan);
Datum orc_url_tid_scan(PG_FUNCTION_ARGS) {
const char *url = DatumGetCString(
DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
uint64_t tid = PG_GETARG_INT64(2);

return orc_tid_scan(fcinfo, 0, url, tid);
}
24 changes: 23 additions & 1 deletion contrib/hornet/load_hornet_helper_function.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
begin;

DROP SCHEMA IF EXISTS hornet_helper CASCADE;
CREATE SCHEMA hornet_helper;
SET SEARCH_PATH = hornet_helper;
Expand Down Expand Up @@ -193,4 +195,24 @@ $$ LANGUAGE PLPGSQL;

drop function if exists is_supported_proc_in_NewQE(oid);

create function is_supported_proc_in_NewQE(oid) returns boolean as '$libdir/hornet','is_supported_proc_in_NewQE'language c immutable;
create function is_supported_proc_in_NewQE(oid) returns boolean as '$libdir/hornet','is_supported_proc_in_NewQE'language c immutable;



drop function if exists orc_tid_scan(anyelement, text, bigint, int[]);
create function orc_tid_scan(anyelement, text, bigint, int[]) returns anyelement
as '$libdir/hornet','orc_url_tid_scan' language c stable;

drop function if exists orc_tid_scan(anyelement, text, bigint);
create function orc_tid_scan(anyelement, text, bigint) returns anyelement
as '$libdir/hornet','orc_url_tid_scan' language c stable;

drop function if exists orc_tid_scan(anyelement, int, bigint, int[]);
create function orc_tid_scan(anyelement, int, bigint, int[]) returns anyelement
as '$libdir/hornet','orc_segno_tid_scan' language c stable;

drop function if exists orc_tid_scan(anyelement, int, bigint);
create function orc_tid_scan(anyelement, int, bigint) returns anyelement
as '$libdir/hornet','orc_segno_tid_scan' language c stable;

commit;
Loading

0 comments on commit e0a3899

Please sign in to comment.