From bd3ae844b2111cfcbb1e54f9ef05266aff19ff93 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Tue, 18 Jun 2024 18:46:07 +0000 Subject: [PATCH 1/4] Fixed memory leaks in ArrowBuilder and Sampler, added luaTests for failed luaCreate calls --- packages/arrow/ArrowBuilder.cpp | 178 ++++++++++++++++---------- packages/arrow/ArrowBuilder.h | 1 + packages/arrow/ArrowSampler.cpp | 70 ++++++---- scripts/selftests/parquet_sampler.lua | 17 ++- 4 files changed, 170 insertions(+), 96 deletions(-) diff --git a/packages/arrow/ArrowBuilder.cpp b/packages/arrow/ArrowBuilder.cpp index 86963ad58..d9f3929ec 100644 --- a/packages/arrow/ArrowBuilder.cpp +++ b/packages/arrow/ArrowBuilder.cpp @@ -80,7 +80,7 @@ int ArrowBuilder::luaCreate (lua_State* L) } catch(const RunTimeException& e) { - if(_parms) _parms->releaseLuaObject(); + /* Constructor releases _parms */ mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what()); return returnLuaStatus(L, false); } @@ -249,98 +249,140 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms, const char* rec_type, const char* id, const char* parms_str, const char* _endpoint): LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE), + builderPid(NULL), parms(_parms), + active(false), + inQ(NULL), + recType(NULL), + timeKey(NULL), + xKey(NULL), + yKey(NULL), hasAncillaryFields(false), - hasAncillaryElements(false) + hasAncillaryElements(false), + outQ(NULL), + dataFile(NULL), + metadataFile(NULL), + outputPath(NULL), + outputMetadataPath(NULL), + parmsAsString(NULL), + endpoint(NULL), + impl(NULL) { - assert(_parms); - assert(outq_name); - assert(inq_name); - assert(rec_type); - assert(id); - assert(parms_str); - assert(_endpoint); - - /* Save Parameters */ - parmsAsString = StringLib::duplicate(parms_str); - endpoint = StringLib::duplicate(_endpoint); - - /* Get Record Meta Data */ - RecordObject::meta_t* rec_meta = RecordObject::getRecordMetaFields(rec_type); - if(rec_meta == NULL) + try { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get meta data for %s", rec_type); - } + /* Validate Parameters */ + if(parms == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); - /* Build Geometry Fields */ - geoData.as_geo = parms->as_geo; - if(geoData.as_geo) - { - /* Check if Record has Geospatial Fields */ - if((rec_meta->x_field == NULL) || (rec_meta->y_field == NULL)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get x and y coordinates for %s", rec_type); - } + if(outq_name == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output queue name"); + + if(inq_name == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); + + if(rec_type == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid record type"); + + if(id == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid id"); - /* Get X Field */ - geoData.x_field = RecordObject::getDefinedField(rec_type, rec_meta->x_field); - if(geoData.x_field.type == RecordObject::INVALID_FIELD) + if(parms_str == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid parameters"); + + if(_endpoint == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid endpoint"); + + /* Save Parameters */ + parmsAsString = StringLib::duplicate(parms_str); + endpoint = StringLib::duplicate(_endpoint); + + /* Get Record Meta Data */ + RecordObject::meta_t* rec_meta = RecordObject::getRecordMetaFields(rec_type); + if(rec_meta == NULL) { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract x field [%s] from record type <%s>", rec_meta->x_field, rec_type); + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get meta data for %s", rec_type); } - /* Get Y Field */ - geoData.y_field = RecordObject::getDefinedField(rec_type, rec_meta->y_field); - if(geoData.y_field.type == RecordObject::INVALID_FIELD) + /* Build Geometry Fields */ + geoData.as_geo = parms->as_geo; + if(geoData.as_geo) { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract y field [%s] from record type <%s>", rec_meta->y_field, rec_type); - } - } + /* Check if Record has Geospatial Fields */ + if((rec_meta->x_field == NULL) || (rec_meta->y_field == NULL)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get x and y coordinates for %s", rec_type); + } - /* Get Paths */ - outputPath = ArrowCommon::getOutputPath(parms); - outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath); + /* Get X Field */ + geoData.x_field = RecordObject::getDefinedField(rec_type, rec_meta->x_field); + if(geoData.x_field.type == RecordObject::INVALID_FIELD) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract x field [%s] from record type <%s>", rec_meta->x_field, rec_type); + } + + /* Get Y Field */ + geoData.y_field = RecordObject::getDefinedField(rec_type, rec_meta->y_field); + if(geoData.y_field.type == RecordObject::INVALID_FIELD) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract y field [%s] from record type <%s>", rec_meta->y_field, rec_type); + } + } - /* Create Unique Temporary Filenames */ - dataFile = ArrowCommon::getUniqueFileName(id); - metadataFile = ArrowCommon::createMetadataFileName(dataFile); + /* Get Paths */ + outputPath = ArrowCommon::getOutputPath(parms); + outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath); - /* - * NO THROWING BEYOND THIS POINT - */ + /* Create Unique Temporary Filenames */ + dataFile = ArrowCommon::getUniqueFileName(id); + metadataFile = ArrowCommon::createMetadataFileName(dataFile); - /* Set Record Type */ - recType = StringLib::duplicate(rec_type); + /* Set Record Type */ + recType = StringLib::duplicate(rec_type); - /* Save Keys */ - timeKey = StringLib::duplicate(getSubField(rec_meta->time_field)); - xKey = StringLib::duplicate(getSubField(rec_meta->x_field)); - yKey = StringLib::duplicate(getSubField(rec_meta->y_field)); + /* Save Keys */ + timeKey = StringLib::duplicate(getSubField(rec_meta->time_field)); + xKey = StringLib::duplicate(getSubField(rec_meta->x_field)); + yKey = StringLib::duplicate(getSubField(rec_meta->y_field)); - /* Get Row Size */ - const RecordObject::field_t batch_rec_field = RecordObject::getDefinedField(recType, rec_meta->batch_field); - if(batch_rec_field.type == RecordObject::INVALID_FIELD) batchRowSizeBytes = 0; - else batchRowSizeBytes = RecordObject::getRecordDataSize(batch_rec_field.exttype); - rowSizeBytes = RecordObject::getRecordDataSize(recType) + batchRowSizeBytes; - maxRowsInGroup = ROW_GROUP_SIZE / rowSizeBytes; + /* Get Row Size */ + const RecordObject::field_t batch_rec_field = RecordObject::getDefinedField(recType, rec_meta->batch_field); + if(batch_rec_field.type == RecordObject::INVALID_FIELD) batchRowSizeBytes = 0; + else batchRowSizeBytes = RecordObject::getRecordDataSize(batch_rec_field.exttype); + rowSizeBytes = RecordObject::getRecordDataSize(recType) + batchRowSizeBytes; + maxRowsInGroup = ROW_GROUP_SIZE / rowSizeBytes; - /* Initialize Queues */ - const int qdepth = maxRowsInGroup * QUEUE_BUFFER_FACTOR; - outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); - inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); + /* Initialize Queues */ + const int qdepth = maxRowsInGroup * QUEUE_BUFFER_FACTOR; + outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); + inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); - /* Allocate Implementation */ - impl = new ArrowBuilderImpl(this); + /* Allocate Implementation */ + impl = new ArrowBuilderImpl(this); - /* Start Builder Thread */ - active = true; - builderPid = new Thread(builderThread, this); + /* Start Builder Thread */ + active = true; + builderPid = new Thread(builderThread, this); + } + catch(const RunTimeException& e) + { + mlog(e.level(), "Error creating ArrowSampler: %s", e.what()); + Delete(); + throw; + } } /*---------------------------------------------------------------------------- * Destructor - *----------------------------------------------------------------------------*/ ArrowBuilder::~ArrowBuilder(void) +{ + Delete(); +} + +/*---------------------------------------------------------------------------- + * Delete - + *----------------------------------------------------------------------------*/ +void ArrowBuilder::Delete(void) { active = false; delete builderPid; diff --git a/packages/arrow/ArrowBuilder.h b/packages/arrow/ArrowBuilder.h index b83f0b286..bbb390f6e 100644 --- a/packages/arrow/ArrowBuilder.h +++ b/packages/arrow/ArrowBuilder.h @@ -184,6 +184,7 @@ class ArrowBuilder: public LuaObject const char* rec_type, const char* id, const char* parms_str, const char* _endpoint); ~ArrowBuilder (void) override; + void Delete (void); static void* builderThread (void* parm); }; diff --git a/packages/arrow/ArrowSampler.cpp b/packages/arrow/ArrowSampler.cpp index 7d824f7c5..bda929494 100644 --- a/packages/arrow/ArrowSampler.cpp +++ b/packages/arrow/ArrowSampler.cpp @@ -58,15 +58,16 @@ const struct luaL_Reg ArrowSampler::LUA_META_TABLE[] = { int ArrowSampler::luaCreate(lua_State* L) { ArrowParms* _parms = NULL; + const char* input_file = NULL; + const char* outq_name = NULL; + std::vector rasters; + /* Get Parameters */ try { - /* Get Parameters */ - _parms = dynamic_cast(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE)); - const char* input_file = getLuaString(L, 2); - const char* outq_name = getLuaString(L, 3); - - std::vector rasters; + _parms = dynamic_cast(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE)); + input_file = getLuaString(L, 2); + outq_name = getLuaString(L, 3); /* Check if the parameter is a table */ luaL_checktype(L, 4, LUA_TTABLE); @@ -78,17 +79,37 @@ int ArrowSampler::luaCreate(lua_State* L) { const char* rkey = getLuaString(L, -2); RasterObject* robj = dynamic_cast(getLuaObject(L, -1, RasterObject::OBJECT_TYPE)); + + if(!rkey) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster key"); + if(!robj) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster object"); + rasters.push_back({rkey, robj}); /* Pop value */ lua_pop(L, 1); } + } + catch(const RunTimeException& e) + { + mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what()); + + /* Release Lua Parameters Objects */ + if(_parms) _parms->releaseLuaObject(); + for(raster_info_t& raster : rasters) + { + raster.robj->releaseLuaObject(); + } + return returnLuaStatus(L, false); + } - /* Create Dispatch */ + /* Create Dispatch */ + try + { return createLuaObject(L, new ArrowSampler(L, _parms, input_file, outq_name, rasters)); } catch(const RunTimeException& e) { + /* Constructor releases all Lua parameters objects on error */ mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what()); return returnLuaStatus(L, false); } @@ -256,39 +277,42 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f const char* outq_name, const std::vector& rasters): LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE), parms(_parms), + outQ(NULL), + impl(NULL), + dataFile(NULL), metadataFile(NULL), + outputPath(NULL), + outputMetadataPath(NULL), alreadySampled(false) { /* Add Lua sample function */ LuaEngine::setAttrFunc(L, "sample", luaSample); - if (parms == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); - - if((parms->path == NULL) || (parms->path[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); - - if ((input_file == NULL) || (input_file[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path"); - - if ((outq_name == NULL) || (outq_name[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); - try { + /* Copy raster objects, create samplers */ for(std::size_t i = 0; i < rasters.size(); i++) { const raster_info_t& raster = rasters[i]; const char* rkey = raster.rkey; RasterObject* robj = raster.robj; - - if(!rkey) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster key"); - if(!robj) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster object"); - sampler_t* sampler = new sampler_t(rkey, robj, this); samplers.push_back(sampler); } + /* Validate Parameters */ + if(parms == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); + + if((parms->path == NULL) || (parms->path[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); + + if((input_file == NULL) || (input_file[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path"); + + if((outq_name == NULL) || (outq_name[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); + /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index d22e52142..1776e6164 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -56,7 +56,6 @@ local out_file_size = getFileSize(_out_geoparquet); print("Output geoparquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") - print('\n--------------------------------------\nTest02: input/output parquet (x, y)\n--------------------------------------') parquet_sampler = arrow.sampler(arrow.parms({path=out_parquet, format="parquet"}), in_parquet, outq_name, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) @@ -140,17 +139,25 @@ os.remove(_out_metadata) print('\n--------------------------------------\nTest07: input/output geoparquet (geo)\n--------------------------------------') -local parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) +parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) runner.check(parquet_sampler ~= nil) -local in_file_size = getFileSize(in_geoparquet); +in_file_size = getFileSize(in_geoparquet); print("Input geoparquet file size: " .. in_file_size .. " bytes") -local status = parquet_sampler:sample() -local out_file_size = getFileSize(_out_geoparquet); +status = parquet_sampler:sample() +out_file_size = getFileSize(_out_geoparquet); print("Output geoparquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") +print('\n--------------------------------------\nTest08: failed luaCreate bad arrow parms\n--------------------------------------\n') +parquet_sampler = arrow.sampler(arrow.parms(), in_geoparquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler == nil) + +print('\n--------------------------------------\nTest09: failed constructor bad paths\n--------------------------------------\n') +parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), "", "", {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler == nil) + -- There is no easy way to read parquet file in Lua, check the size of the output files -- the files were tested with python scripts From d67e33f2d7c5b381dff934f3e0b618f872b7b537 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Wed, 19 Jun 2024 17:01:10 +0000 Subject: [PATCH 2/4] Proper handling of ParquetStatusException --- packages/arrow/ArrowBuilder.cpp | 198 ++++++++++---------------- packages/arrow/ArrowBuilder.h | 1 - packages/arrow/ArrowBuilderImpl.cpp | 10 +- packages/arrow/ArrowSampler.cpp | 21 +-- packages/arrow/ArrowSamplerImpl.cpp | 50 +++++-- scripts/selftests/parquet_sampler.lua | 13 +- 6 files changed, 140 insertions(+), 153 deletions(-) diff --git a/packages/arrow/ArrowBuilder.cpp b/packages/arrow/ArrowBuilder.cpp index d9f3929ec..6d1d5961c 100644 --- a/packages/arrow/ArrowBuilder.cpp +++ b/packages/arrow/ArrowBuilder.cpp @@ -80,7 +80,7 @@ int ArrowBuilder::luaCreate (lua_State* L) } catch(const RunTimeException& e) { - /* Constructor releases _parms */ + if(_parms) _parms->releaseLuaObject(); mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what()); return returnLuaStatus(L, false); } @@ -249,154 +249,114 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms, const char* rec_type, const char* id, const char* parms_str, const char* _endpoint): LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE), - builderPid(NULL), parms(_parms), - active(false), - inQ(NULL), - recType(NULL), - timeKey(NULL), - xKey(NULL), - yKey(NULL), hasAncillaryFields(false), - hasAncillaryElements(false), - outQ(NULL), - dataFile(NULL), - metadataFile(NULL), - outputPath(NULL), - outputMetadataPath(NULL), - parmsAsString(NULL), - endpoint(NULL), - impl(NULL) + hasAncillaryElements(false) { - try + assert(_parms); + assert(outq_name); + assert(inq_name); + assert(rec_type); + assert(id); + assert(parms_str); + assert(_endpoint); + + /* Get Record Meta Data */ + RecordObject::meta_t* rec_meta = RecordObject::getRecordMetaFields(rec_type); + if(rec_meta == NULL) { - /* Validate Parameters */ - if(parms == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); - - if(outq_name == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output queue name"); - - if(inq_name == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); - - if(rec_type == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid record type"); - - if(id == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid id"); - - if(parms_str == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid parameters"); - - if(_endpoint == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid endpoint"); + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get meta data for %s", rec_type); + } - /* Save Parameters */ - parmsAsString = StringLib::duplicate(parms_str); - endpoint = StringLib::duplicate(_endpoint); + /* Build Geometry Fields */ + geoData.as_geo = parms->as_geo; + if(geoData.as_geo) + { + /* Check if Record has Geospatial Fields */ + if((rec_meta->x_field == NULL) || (rec_meta->y_field == NULL)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get x and y coordinates for %s", rec_type); + } - /* Get Record Meta Data */ - RecordObject::meta_t* rec_meta = RecordObject::getRecordMetaFields(rec_type); - if(rec_meta == NULL) + /* Get X Field */ + geoData.x_field = RecordObject::getDefinedField(rec_type, rec_meta->x_field); + if(geoData.x_field.type == RecordObject::INVALID_FIELD) { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get meta data for %s", rec_type); + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract x field [%s] from record type <%s>", rec_meta->x_field, rec_type); } - /* Build Geometry Fields */ - geoData.as_geo = parms->as_geo; - if(geoData.as_geo) + /* Get Y Field */ + geoData.y_field = RecordObject::getDefinedField(rec_type, rec_meta->y_field); + if(geoData.y_field.type == RecordObject::INVALID_FIELD) { - /* Check if Record has Geospatial Fields */ - if((rec_meta->x_field == NULL) || (rec_meta->y_field == NULL)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to get x and y coordinates for %s", rec_type); - } + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract y field [%s] from record type <%s>", rec_meta->y_field, rec_type); + } + } - /* Get X Field */ - geoData.x_field = RecordObject::getDefinedField(rec_type, rec_meta->x_field); - if(geoData.x_field.type == RecordObject::INVALID_FIELD) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract x field [%s] from record type <%s>", rec_meta->x_field, rec_type); - } + /* Get Paths */ + outputPath = ArrowCommon::getOutputPath(parms); // throws - /* Get Y Field */ - geoData.y_field = RecordObject::getDefinedField(rec_type, rec_meta->y_field); - if(geoData.y_field.type == RecordObject::INVALID_FIELD) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract y field [%s] from record type <%s>", rec_meta->y_field, rec_type); - } - } + /* + * NO THROWING BEYOND THIS POINT + */ - /* Get Paths */ - outputPath = ArrowCommon::getOutputPath(parms); - outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath); + /* Get Paths */ + outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath); - /* Create Unique Temporary Filenames */ - dataFile = ArrowCommon::getUniqueFileName(id); - metadataFile = ArrowCommon::createMetadataFileName(dataFile); + /* Save Parameters */ + parmsAsString = StringLib::duplicate(parms_str); + endpoint = StringLib::duplicate(_endpoint); - /* Set Record Type */ - recType = StringLib::duplicate(rec_type); + /* Create Unique Temporary Filenames */ + dataFile = ArrowCommon::getUniqueFileName(id); + metadataFile = ArrowCommon::createMetadataFileName(dataFile); - /* Save Keys */ - timeKey = StringLib::duplicate(getSubField(rec_meta->time_field)); - xKey = StringLib::duplicate(getSubField(rec_meta->x_field)); - yKey = StringLib::duplicate(getSubField(rec_meta->y_field)); + /* Set Record Type */ + recType = StringLib::duplicate(rec_type); - /* Get Row Size */ - const RecordObject::field_t batch_rec_field = RecordObject::getDefinedField(recType, rec_meta->batch_field); - if(batch_rec_field.type == RecordObject::INVALID_FIELD) batchRowSizeBytes = 0; - else batchRowSizeBytes = RecordObject::getRecordDataSize(batch_rec_field.exttype); - rowSizeBytes = RecordObject::getRecordDataSize(recType) + batchRowSizeBytes; - maxRowsInGroup = ROW_GROUP_SIZE / rowSizeBytes; + /* Save Keys */ + timeKey = StringLib::duplicate(getSubField(rec_meta->time_field)); + xKey = StringLib::duplicate(getSubField(rec_meta->x_field)); + yKey = StringLib::duplicate(getSubField(rec_meta->y_field)); - /* Initialize Queues */ - const int qdepth = maxRowsInGroup * QUEUE_BUFFER_FACTOR; - outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); - inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); + /* Get Row Size */ + const RecordObject::field_t batch_rec_field = RecordObject::getDefinedField(recType, rec_meta->batch_field); + if(batch_rec_field.type == RecordObject::INVALID_FIELD) batchRowSizeBytes = 0; + else batchRowSizeBytes = RecordObject::getRecordDataSize(batch_rec_field.exttype); + rowSizeBytes = RecordObject::getRecordDataSize(recType) + batchRowSizeBytes; + maxRowsInGroup = ROW_GROUP_SIZE / rowSizeBytes; - /* Allocate Implementation */ - impl = new ArrowBuilderImpl(this); + /* Initialize Queues */ + const int qdepth = maxRowsInGroup * QUEUE_BUFFER_FACTOR; + outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); + inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); - /* Start Builder Thread */ - active = true; - builderPid = new Thread(builderThread, this); - } - catch(const RunTimeException& e) - { - mlog(e.level(), "Error creating ArrowSampler: %s", e.what()); - Delete(); - throw; - } + /* Allocate Implementation */ + impl = new ArrowBuilderImpl(this); + + /* Start Builder Thread */ + active = true; + builderPid = new Thread(builderThread, this); } /*---------------------------------------------------------------------------- * Destructor - *----------------------------------------------------------------------------*/ ArrowBuilder::~ArrowBuilder(void) -{ - Delete(); -} - -/*---------------------------------------------------------------------------- - * Delete - - *----------------------------------------------------------------------------*/ -void ArrowBuilder::Delete(void) { active = false; delete builderPid; parms->releaseLuaObject(); - delete [] dataFile; - delete [] metadataFile; - delete [] outputPath; - delete [] outputMetadataPath; - delete [] parmsAsString; - delete [] endpoint; - delete [] recType; - delete [] timeKey; - delete [] xKey; - delete [] yKey; + delete[] dataFile; + delete[] metadataFile; + delete[] outputPath; + delete[] outputMetadataPath; + delete[] parmsAsString; + delete[] endpoint; + delete[] recType; + delete[] timeKey; + delete[] xKey; + delete[] yKey; delete outQ; delete inQ; delete impl; diff --git a/packages/arrow/ArrowBuilder.h b/packages/arrow/ArrowBuilder.h index bbb390f6e..b83f0b286 100644 --- a/packages/arrow/ArrowBuilder.h +++ b/packages/arrow/ArrowBuilder.h @@ -184,7 +184,6 @@ class ArrowBuilder: public LuaObject const char* rec_type, const char* id, const char* parms_str, const char* _endpoint); ~ArrowBuilder (void) override; - void Delete (void); static void* builderThread (void* parm); }; diff --git a/packages/arrow/ArrowBuilderImpl.cpp b/packages/arrow/ArrowBuilderImpl.cpp index f134c2130..8bb24db1f 100644 --- a/packages/arrow/ArrowBuilderImpl.cpp +++ b/packages/arrow/ArrowBuilderImpl.cpp @@ -207,7 +207,15 @@ void ArrowBuilderImpl::createSchema (void) { /* Set Arrow Output Stream */ shared_ptr file_output_stream; - PARQUET_ASSIGN_OR_THROW(file_output_stream, arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile())); + auto _result = arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile()); + if(_result.ok()) + { + file_output_stream = _result.ValueOrDie(); + } + else + { + mlog(CRITICAL, "Failed to open file output stream: %s", _result.status().ToString().c_str()); + } /* Set Writer Properties */ parquet::WriterProperties::Builder writer_props_builder; diff --git a/packages/arrow/ArrowSampler.cpp b/packages/arrow/ArrowSampler.cpp index bda929494..bdd517500 100644 --- a/packages/arrow/ArrowSampler.cpp +++ b/packages/arrow/ArrowSampler.cpp @@ -109,7 +109,7 @@ int ArrowSampler::luaCreate(lua_State* L) } catch(const RunTimeException& e) { - /* Constructor releases all Lua parameters objects on error */ + /* Constructor releases all Lua objects */ mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what()); return returnLuaStatus(L, false); } @@ -288,6 +288,11 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f /* Add Lua sample function */ LuaEngine::setAttrFunc(L, "sample", luaSample); + /* Validate Parameters */ + assert(parms); + assert(input_file); + assert(outq_name); + try { /* Copy raster objects, create samplers */ @@ -300,19 +305,6 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f samplers.push_back(sampler); } - /* Validate Parameters */ - if(parms == NULL) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); - - if((parms->path == NULL) || (parms->path[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); - - if((input_file == NULL) || (input_file[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path"); - - if((outq_name == NULL) || (outq_name[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); - /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); @@ -333,7 +325,6 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f } catch(const RunTimeException& e) { - mlog(e.level(), "Error creating ArrowSampler: %s", e.what()); Delete(); throw; } diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index 591d50f27..4e55a7dbc 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -89,9 +89,17 @@ ArrowSamplerImpl::~ArrowSamplerImpl (void) *----------------------------------------------------------------------------*/ void ArrowSamplerImpl::processInputFile(const char* file_path, std::vector& points) { - /* Open the input file */ - PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); - PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader)); + try + { + /* Open the input file */ + PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader)); + } + catch(const parquet::ParquetStatusException& e) + { + /* Rethrow as a runtime exception */ + throw RunTimeException(CRITICAL, RTE_ERROR, "%s", e.what()); + } getMetadata(); getPoints(points); @@ -116,14 +124,18 @@ bool ArrowSamplerImpl::processSamples(ArrowSampler::sampler_t* sampler) /* Arrow csv writer cannot handle columns with lists of samples */ status = makeColumnsWithOneSample(sampler); } - else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); + else mlog(CRITICAL, "Unsupported file format"); } - catch(const RunTimeException& e) + catch(const parquet::ParquetStatusException& e) + { + mlog(CRITICAL, "Error processing samples: %s", e.what()); + } + + if(!status) { /* No columns will be added */ newFields.clear(); newColumns.clear(); - mlog(e.level(), "Error processing samples: %s", e.what()); } return status; @@ -137,12 +149,14 @@ void ArrowSamplerImpl::createOutpuFiles(void) const ArrowParms* parms = arrowSampler->getParms(); const char* dataFile = arrowSampler->getDataFile(); - auto table = inputFileToTable(); - auto updated_table = addNewColumns(table); - table = nullptr; - - switch (parms->format) + try { + auto table = inputFileToTable(); + auto updated_table = addNewColumns(table); + table = nullptr; + + switch(parms->format) + { case ArrowParms::PARQUET: tableToParquet(updated_table, dataFile); break; @@ -159,12 +173,18 @@ void ArrowSamplerImpl::createOutpuFiles(void) default: throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); - } + } - /* Generate metadata file since csv/feather writers ignore it */ - if(parms->format == ArrowParms::CSV || parms->format == ArrowParms::FEATHER) + /* Generate metadata file since csv/feather writers ignore it */ + if(parms->format == ArrowParms::CSV || parms->format == ArrowParms::FEATHER) + { + metadataToJson(updated_table, arrowSampler->getMetadataFile()); + } + } + catch(const parquet::ParquetStatusException& e) { - metadataToJson(updated_table, arrowSampler->getMetadataFile()); + /* Rethrow as a runtime exception */ + throw RunTimeException(CRITICAL, RTE_ERROR, "%s", e.what()); } } diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index 1776e6164..5cfefce4a 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -150,14 +150,23 @@ out_file_size = getFileSize(_out_geoparquet); print("Output geoparquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") +-- Negative tests + print('\n--------------------------------------\nTest08: failed luaCreate bad arrow parms\n--------------------------------------\n') parquet_sampler = arrow.sampler(arrow.parms(), in_geoparquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) runner.check(parquet_sampler == nil) -print('\n--------------------------------------\nTest09: failed constructor bad paths\n--------------------------------------\n') -parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), "", "", {["mosaic"] = dem1, ["strips"] = dem2}) +print('\n--------------------------------------\nTest09: failed constructor empty input file\n--------------------------------------\n') +parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), "", outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler == nil) + +print('\n--------------------------------------\nTest10: failed constructor nil input file\n--------------------------------------\n') +parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), nil, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) runner.check(parquet_sampler == nil) +print('\n--------------------------------------\nTest10: failed constructor nil output queue\n--------------------------------------\n') +parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, nil, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler == nil) -- There is no easy way to read parquet file in Lua, check the size of the output files -- the files were tested with python scripts From 432e89112613578273021a630009f4a5a0566ffa Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Thu, 20 Jun 2024 13:22:00 +0000 Subject: [PATCH 3/4] raster code proper handling of Thread exception --- packages/geo/GeoIndexedRaster.cpp | 33 ++++++++++++++++++++----------- packages/geo/GeoIndexedRaster.h | 2 +- packages/geo/GeoParms.h | 1 + packages/geo/RasterObject.cpp | 12 +++++++++++ 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/packages/geo/GeoIndexedRaster.cpp b/packages/geo/GeoIndexedRaster.cpp index e281fffce..80ec7819d 100644 --- a/packages/geo/GeoIndexedRaster.cpp +++ b/packages/geo/GeoIndexedRaster.cpp @@ -439,9 +439,6 @@ bool GeoIndexedRaster::openGeoIndex(const OGRGeometry* geo) *----------------------------------------------------------------------------*/ void GeoIndexedRaster::sampleRasters(OGRGeometry* geo) { - /* Create additional reader threads if needed */ - createThreads(); - /* For each raster which is marked to be sampled, give it to the reader thread */ int signaledReaders = 0; int i = 0; @@ -496,8 +493,13 @@ bool GeoIndexedRaster::sample(OGRGeometry* geo, int64_t gps) if(findRasters(geo) && filterRasters(gps) && updateCache()) { - sampleRasters(geo); - status = true; + /* Create additional reader threads if needed */ + status = createThreads(); + if(status) + { + sampleRasters(geo); + status = true; + } } return status; @@ -677,21 +679,30 @@ void* GeoIndexedRaster::readingThread(void *param) /*---------------------------------------------------------------------------- * createThreads *----------------------------------------------------------------------------*/ -void GeoIndexedRaster::createThreads(void) +bool GeoIndexedRaster::createThreads(void) { const int threadsNeeded = cache.length(); const int threadsNow = readers.length(); const int newThreadsCnt = threadsNeeded - threadsNow; if(threadsNeeded <= threadsNow) - return; + return true; - for(int i = 0; i < newThreadsCnt; i++) + try { - Reader* r = new Reader(this); - readers.add(r); + for(int i = 0; i < newThreadsCnt; i++) + { + Reader* r = new Reader(this); + readers.add(r); + } + } + catch (const RunTimeException &e) + { + ssError |= SS_RESOURCE_LIMIT_ERROR; + mlog(CRITICAL, "Failed to create reader threads, needed: %d, created: %d", newThreadsCnt, readers.length() - threadsNow); } - assert(readers.length() == threadsNeeded); + + return readers.length() == threadsNeeded; } /*---------------------------------------------------------------------------- diff --git a/packages/geo/GeoIndexedRaster.h b/packages/geo/GeoIndexedRaster.h index 26506b443..90f9a9a64 100644 --- a/packages/geo/GeoIndexedRaster.h +++ b/packages/geo/GeoIndexedRaster.h @@ -172,7 +172,7 @@ class GeoIndexedRaster: public RasterObject static void* readingThread (void *param); - void createThreads (void); + bool createThreads (void); bool updateCache (void); bool filterRasters (int64_t gps); }; diff --git a/packages/geo/GeoParms.h b/packages/geo/GeoParms.h index 072e71f12..9350b5c89 100644 --- a/packages/geo/GeoParms.h +++ b/packages/geo/GeoParms.h @@ -58,6 +58,7 @@ #define SS_WRITE_ERROR (1 << 4) #define SS_SUBRASTER_ERROR (1 << 5) #define SS_INDEX_FILE_ERROR (1 << 6) +#define SS_RESOURCE_LIMIT_ERROR (1 << 7) /****************************************************************************** diff --git a/packages/geo/RasterObject.cpp b/packages/geo/RasterObject.cpp index 7526bedaa..0c3d0a656 100644 --- a/packages/geo/RasterObject.cpp +++ b/packages/geo/RasterObject.cpp @@ -263,6 +263,12 @@ int RasterObject::luaSamples(lua_State *L) mlog(CRITICAL, "Too many rasters to sample, max allowed: %d, limit your AOI/temporal range or use filters", GeoIndexedRaster::MAX_READER_THREADS); } + if(err & SS_RESOURCE_LIMIT_ERROR) + { + listvalid = false; + mlog(CRITICAL, "System resource limit reached, could not sample rasters"); + } + /* Create return table */ lua_createtable(L, slist.length(), 0); num_ret++; @@ -394,6 +400,12 @@ int RasterObject::slist2table(const List& slist, uint32_t errors, mlog(CRITICAL, "Some rasters could not be subset, requested memory size > max allowed: %ld MB", RasterSubset::MAX_SIZE / (1024 * 1024)); } + if(errors & SS_RESOURCE_LIMIT_ERROR) + { + listvalid = false; + mlog(CRITICAL, "System resource limit reached, could not subset rasters"); + } + /* Create return table */ lua_createtable(L, slist.length(), 0); num_ret++; From cb2db217bcdb8d71ba65a8ed13fb11147af8f406 Mon Sep 17 00:00:00 2001 From: JP Swinski Date: Thu, 20 Jun 2024 16:25:03 +0000 Subject: [PATCH 4/4] fix in arrow builder implementation to handle when a file output stream is not able to be created --- packages/arrow/ArrowBuilderImpl.cpp | 74 +++++++++++++++++------------ packages/arrow/ArrowBuilderImpl.h | 2 +- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/packages/arrow/ArrowBuilderImpl.cpp b/packages/arrow/ArrowBuilderImpl.cpp index 8bb24db1f..c132905b0 100644 --- a/packages/arrow/ArrowBuilderImpl.cpp +++ b/packages/arrow/ArrowBuilderImpl.cpp @@ -133,8 +133,13 @@ bool ArrowBuilderImpl::processRecordBatch (batch_list_t& record_batch, int num_r /* Create Parquet Writer (on first time) */ if(firstTime) { - createSchema(); firstTime = false; + const bool schema_status = createSchema(); + if(!schema_status) + { + mlog(CRITICAL, "Failed to create schema... aborting!"); + return false; + } } if(writerFormat == ArrowParms::PARQUET) @@ -198,8 +203,10 @@ bool ArrowBuilderImpl::processRecordBatch (batch_list_t& record_batch, int num_r /*---------------------------------------------------------------------------- * createSchema *----------------------------------------------------------------------------*/ -void ArrowBuilderImpl::createSchema (void) +bool ArrowBuilderImpl::createSchema (void) { + bool status = true; + /* Create Schema */ schema = make_shared(fieldVector); @@ -211,38 +218,40 @@ void ArrowBuilderImpl::createSchema (void) if(_result.ok()) { file_output_stream = _result.ValueOrDie(); - } - else - { - mlog(CRITICAL, "Failed to open file output stream: %s", _result.status().ToString().c_str()); - } - - /* Set Writer Properties */ - parquet::WriterProperties::Builder writer_props_builder; - writer_props_builder.compression(parquet::Compression::SNAPPY); - writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6); - const shared_ptr writer_props = writer_props_builder.build(); - - /* Set Arrow Writer Properties */ - auto arrow_writer_props = parquet::ArrowWriterProperties::Builder().store_schema()->build(); - /* Set MetaData */ - auto metadata = schema->metadata() ? schema->metadata()->Copy() : std::make_shared(); - if(arrowBuilder->getAsGeo()) appendGeoMetaData(metadata); - appendServerMetaData(metadata); - appendPandasMetaData(metadata); - schema = schema->WithMetadata(metadata); - - /* Create Parquet Writer */ - auto result = parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), file_output_stream, writer_props, arrow_writer_props); - if(result.ok()) - { - parquetWriter = std::move(result).ValueOrDie(); - writerFormat = ArrowParms::PARQUET; + /* Set Writer Properties */ + parquet::WriterProperties::Builder writer_props_builder; + writer_props_builder.compression(parquet::Compression::SNAPPY); + writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6); + const shared_ptr writer_props = writer_props_builder.build(); + + /* Set Arrow Writer Properties */ + auto arrow_writer_props = parquet::ArrowWriterProperties::Builder().store_schema()->build(); + + /* Set MetaData */ + auto metadata = schema->metadata() ? schema->metadata()->Copy() : std::make_shared(); + if(arrowBuilder->getAsGeo()) appendGeoMetaData(metadata); + appendServerMetaData(metadata); + appendPandasMetaData(metadata); + schema = schema->WithMetadata(metadata); + + /* Create Parquet Writer */ + auto result = parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), file_output_stream, writer_props, arrow_writer_props); + if(result.ok()) + { + parquetWriter = std::move(result).ValueOrDie(); + writerFormat = ArrowParms::PARQUET; + } + else + { + mlog(CRITICAL, "Failed to open parquet writer: %s", result.status().ToString().c_str()); + status = false; + } } else { - mlog(CRITICAL, "Failed to open parquet writer: %s", result.status().ToString().c_str()); + mlog(CRITICAL, "Failed to open file output stream: %s", _result.status().ToString().c_str()); + status = false; } } else if(arrowBuilder->getParms()->format == ArrowParms::FEATHER) @@ -259,6 +268,7 @@ void ArrowBuilderImpl::createSchema (void) else { mlog(CRITICAL, "Failed to open feather writer: %s", result.status().ToString().c_str()); + status = false; } } else if(arrowBuilder->getParms()->format == ArrowParms::CSV) @@ -275,12 +285,16 @@ void ArrowBuilderImpl::createSchema (void) else { mlog(CRITICAL, "Failed to open csv writer: %s", result.status().ToString().c_str()); + status = false; } } else { mlog(CRITICAL, "Unsupported format: %d", arrowBuilder->getParms()->format); + status = false; } + + return status; } /*---------------------------------------------------------------------------- diff --git a/packages/arrow/ArrowBuilderImpl.h b/packages/arrow/ArrowBuilderImpl.h index b2ffc3c40..4fdc00462 100644 --- a/packages/arrow/ArrowBuilderImpl.h +++ b/packages/arrow/ArrowBuilderImpl.h @@ -103,7 +103,7 @@ class ArrowBuilderImpl * Methods *--------------------------------------------------------------------*/ - void createSchema (void); + bool createSchema (void); bool buildFieldList (const char* rec_type, int offset, int flags); static void appendGeoMetaData(const std::shared_ptr& metadata); void appendServerMetaData (const std::shared_ptr& metadata);