Skip to content

Commit

Permalink
Merge pull request #415 from SlideRuleEarth/luaCreateCleanup
Browse files Browse the repository at this point in the history
Fixed memory leaks in ArrowBuilder and ArrowSampler
  • Loading branch information
jpswinski authored Jun 20, 2024
2 parents d0c6358 + faac861 commit 1b7604e
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 101 deletions.
40 changes: 21 additions & 19 deletions packages/arrow/ArrowBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,6 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms,
assert(parms_str);
assert(_endpoint);

/* Save Parameters */
parmsAsString = StringLib::duplicate(parms_str);
endpoint = StringLib::duplicate(_endpoint);

/* Get Record Meta Data */
RecordObject::meta_t* rec_meta = RecordObject::getRecordMetaFields(rec_type);
if(rec_meta == NULL)
Expand Down Expand Up @@ -298,17 +294,23 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms,
}

/* Get Paths */
outputPath = ArrowCommon::getOutputPath(parms);
outputPath = ArrowCommon::getOutputPath(parms); // throws

/*
* NO THROWING BEYOND THIS POINT
*/

/* Get Paths */
outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath);

/* Save Parameters */
parmsAsString = StringLib::duplicate(parms_str);
endpoint = StringLib::duplicate(_endpoint);

/* Create Unique Temporary Filenames */
dataFile = ArrowCommon::getUniqueFileName(id);
metadataFile = ArrowCommon::createMetadataFileName(dataFile);

/*
* NO THROWING BEYOND THIS POINT
*/

/* Set Record Type */
recType = StringLib::duplicate(rec_type);

Expand Down Expand Up @@ -345,16 +347,16 @@ ArrowBuilder::~ArrowBuilder(void)
active = false;
delete builderPid;
parms->releaseLuaObject();
delete [] dataFile;
delete [] metadataFile;
delete [] outputPath;
delete [] outputMetadataPath;
delete [] parmsAsString;
delete [] endpoint;
delete [] recType;
delete [] timeKey;
delete [] xKey;
delete [] yKey;
delete[] dataFile;
delete[] metadataFile;
delete[] outputPath;
delete[] outputMetadataPath;
delete[] parmsAsString;
delete[] endpoint;
delete[] recType;
delete[] timeKey;
delete[] xKey;
delete[] yKey;
delete outQ;
delete inQ;
delete impl;
Expand Down
74 changes: 48 additions & 26 deletions packages/arrow/ArrowBuilderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,13 @@ bool ArrowBuilderImpl::processRecordBatch (batch_list_t& record_batch, int num_r
/* Create Parquet Writer (on first time) */
if(firstTime)
{
createSchema();
firstTime = false;
const bool schema_status = createSchema();
if(!schema_status)
{
mlog(CRITICAL, "Failed to create schema... aborting!");
return false;
}
}

if(writerFormat == ArrowParms::PARQUET)
Expand Down Expand Up @@ -198,43 +203,55 @@ bool ArrowBuilderImpl::processRecordBatch (batch_list_t& record_batch, int num_r
/*----------------------------------------------------------------------------
* createSchema
*----------------------------------------------------------------------------*/
void ArrowBuilderImpl::createSchema (void)
bool ArrowBuilderImpl::createSchema (void)
{
bool status = true;

/* Create Schema */
schema = make_shared<arrow::Schema>(fieldVector);

if(arrowBuilder->getParms()->format == ArrowParms::PARQUET)
{
/* Set Arrow Output Stream */
shared_ptr<arrow::io::FileOutputStream> file_output_stream;
PARQUET_ASSIGN_OR_THROW(file_output_stream, arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile()));

/* Set Writer Properties */
parquet::WriterProperties::Builder writer_props_builder;
writer_props_builder.compression(parquet::Compression::SNAPPY);
writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6);
const shared_ptr<parquet::WriterProperties> writer_props = writer_props_builder.build();

/* Set Arrow Writer Properties */
auto arrow_writer_props = parquet::ArrowWriterProperties::Builder().store_schema()->build();

/* Set MetaData */
auto metadata = schema->metadata() ? schema->metadata()->Copy() : std::make_shared<arrow::KeyValueMetadata>();
if(arrowBuilder->getAsGeo()) appendGeoMetaData(metadata);
appendServerMetaData(metadata);
appendPandasMetaData(metadata);
schema = schema->WithMetadata(metadata);

/* Create Parquet Writer */
auto result = parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), file_output_stream, writer_props, arrow_writer_props);
if(result.ok())
auto _result = arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile());
if(_result.ok())
{
parquetWriter = std::move(result).ValueOrDie();
writerFormat = ArrowParms::PARQUET;
file_output_stream = _result.ValueOrDie();

/* Set Writer Properties */
parquet::WriterProperties::Builder writer_props_builder;
writer_props_builder.compression(parquet::Compression::SNAPPY);
writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6);
const shared_ptr<parquet::WriterProperties> writer_props = writer_props_builder.build();

/* Set Arrow Writer Properties */
auto arrow_writer_props = parquet::ArrowWriterProperties::Builder().store_schema()->build();

/* Set MetaData */
auto metadata = schema->metadata() ? schema->metadata()->Copy() : std::make_shared<arrow::KeyValueMetadata>();
if(arrowBuilder->getAsGeo()) appendGeoMetaData(metadata);
appendServerMetaData(metadata);
appendPandasMetaData(metadata);
schema = schema->WithMetadata(metadata);

/* Create Parquet Writer */
auto result = parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), file_output_stream, writer_props, arrow_writer_props);
if(result.ok())
{
parquetWriter = std::move(result).ValueOrDie();
writerFormat = ArrowParms::PARQUET;
}
else
{
mlog(CRITICAL, "Failed to open parquet writer: %s", result.status().ToString().c_str());
status = false;
}
}
else
{
mlog(CRITICAL, "Failed to open parquet writer: %s", result.status().ToString().c_str());
mlog(CRITICAL, "Failed to open file output stream: %s", _result.status().ToString().c_str());
status = false;
}
}
else if(arrowBuilder->getParms()->format == ArrowParms::FEATHER)
Expand All @@ -251,6 +268,7 @@ void ArrowBuilderImpl::createSchema (void)
else
{
mlog(CRITICAL, "Failed to open feather writer: %s", result.status().ToString().c_str());
status = false;
}
}
else if(arrowBuilder->getParms()->format == ArrowParms::CSV)
Expand All @@ -267,12 +285,16 @@ void ArrowBuilderImpl::createSchema (void)
else
{
mlog(CRITICAL, "Failed to open csv writer: %s", result.status().ToString().c_str());
status = false;
}
}
else
{
mlog(CRITICAL, "Unsupported format: %d", arrowBuilder->getParms()->format);
status = false;
}

return status;
}

/*----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion packages/arrow/ArrowBuilderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ArrowBuilderImpl
* Methods
*--------------------------------------------------------------------*/

void createSchema (void);
bool createSchema (void);
bool buildFieldList (const char* rec_type, int offset, int flags);
static void appendGeoMetaData(const std::shared_ptr<arrow::KeyValueMetadata>& metadata);
void appendServerMetaData (const std::shared_ptr<arrow::KeyValueMetadata>& metadata);
Expand Down
61 changes: 38 additions & 23 deletions packages/arrow/ArrowSampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ const struct luaL_Reg ArrowSampler::LUA_META_TABLE[] = {
int ArrowSampler::luaCreate(lua_State* L)
{
ArrowParms* _parms = NULL;
const char* input_file = NULL;
const char* outq_name = NULL;
std::vector<raster_info_t> rasters;

/* Get Parameters */
try
{
/* Get Parameters */
_parms = dynamic_cast<ArrowParms*>(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE));
const char* input_file = getLuaString(L, 2);
const char* outq_name = getLuaString(L, 3);

std::vector<raster_info_t> rasters;
_parms = dynamic_cast<ArrowParms*>(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE));
input_file = getLuaString(L, 2);
outq_name = getLuaString(L, 3);

/* Check if the parameter is a table */
luaL_checktype(L, 4, LUA_TTABLE);
Expand All @@ -78,17 +79,37 @@ int ArrowSampler::luaCreate(lua_State* L)
{
const char* rkey = getLuaString(L, -2);
RasterObject* robj = dynamic_cast<RasterObject*>(getLuaObject(L, -1, RasterObject::OBJECT_TYPE));

if(!rkey) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster key");
if(!robj) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster object");

rasters.push_back({rkey, robj});

/* Pop value */
lua_pop(L, 1);
}
}
catch(const RunTimeException& e)
{
mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what());

/* Create Dispatch */
/* Release Lua Parameters Objects */
if(_parms) _parms->releaseLuaObject();
for(raster_info_t& raster : rasters)
{
raster.robj->releaseLuaObject();
}
return returnLuaStatus(L, false);
}

/* Create Dispatch */
try
{
return createLuaObject(L, new ArrowSampler(L, _parms, input_file, outq_name, rasters));
}
catch(const RunTimeException& e)
{
/* Constructor releases all Lua objects */
mlog(e.level(), "Error creating %s: %s", LUA_META_NAME, e.what());
return returnLuaStatus(L, false);
}
Expand Down Expand Up @@ -256,35 +277,30 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f
const char* outq_name, const std::vector<raster_info_t>& rasters):
LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE),
parms(_parms),
outQ(NULL),
impl(NULL),
dataFile(NULL),
metadataFile(NULL),
outputPath(NULL),
outputMetadataPath(NULL),
alreadySampled(false)
{
/* Add Lua sample function */
LuaEngine::setAttrFunc(L, "sample", luaSample);

if (parms == NULL)
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object");

if((parms->path == NULL) || (parms->path[0] == '\0'))
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path");

if ((input_file == NULL) || (input_file[0] == '\0'))
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path");

if ((outq_name == NULL) || (outq_name[0] == '\0'))
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name");
/* Validate Parameters */
assert(parms);
assert(input_file);
assert(outq_name);

try
{
/* Copy raster objects, create samplers */
for(std::size_t i = 0; i < rasters.size(); i++)
{
const raster_info_t& raster = rasters[i];
const char* rkey = raster.rkey;
RasterObject* robj = raster.robj;

if(!rkey) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster key");
if(!robj) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster object");

sampler_t* sampler = new sampler_t(rkey, robj, this);
samplers.push_back(sampler);
}
Expand All @@ -309,7 +325,6 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f
}
catch(const RunTimeException& e)
{
mlog(e.level(), "Error creating ArrowSampler: %s", e.what());
Delete();
throw;
}
Expand Down
50 changes: 35 additions & 15 deletions packages/arrow/ArrowSamplerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,17 @@ ArrowSamplerImpl::~ArrowSamplerImpl (void)
*----------------------------------------------------------------------------*/
void ArrowSamplerImpl::processInputFile(const char* file_path, std::vector<ArrowSampler::point_info_t*>& points)
{
/* Open the input file */
PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool()));
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader));
try
{
/* Open the input file */
PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool()));
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader));
}
catch(const parquet::ParquetStatusException& e)
{
/* Rethrow as a runtime exception */
throw RunTimeException(CRITICAL, RTE_ERROR, "%s", e.what());
}

getMetadata();
getPoints(points);
Expand All @@ -116,14 +124,18 @@ bool ArrowSamplerImpl::processSamples(ArrowSampler::sampler_t* sampler)
/* Arrow csv writer cannot handle columns with lists of samples */
status = makeColumnsWithOneSample(sampler);
}
else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format");
else mlog(CRITICAL, "Unsupported file format");
}
catch(const RunTimeException& e)
catch(const parquet::ParquetStatusException& e)
{
mlog(CRITICAL, "Error processing samples: %s", e.what());
}

if(!status)
{
/* No columns will be added */
newFields.clear();
newColumns.clear();
mlog(e.level(), "Error processing samples: %s", e.what());
}

return status;
Expand All @@ -137,12 +149,14 @@ void ArrowSamplerImpl::createOutpuFiles(void)
const ArrowParms* parms = arrowSampler->getParms();
const char* dataFile = arrowSampler->getDataFile();

auto table = inputFileToTable();
auto updated_table = addNewColumns(table);
table = nullptr;

switch (parms->format)
try
{
auto table = inputFileToTable();
auto updated_table = addNewColumns(table);
table = nullptr;

switch(parms->format)
{
case ArrowParms::PARQUET:
tableToParquet(updated_table, dataFile);
break;
Expand All @@ -159,12 +173,18 @@ void ArrowSamplerImpl::createOutpuFiles(void)

default:
throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format");
}
}

/* Generate metadata file since csv/feather writers ignore it */
if(parms->format == ArrowParms::CSV || parms->format == ArrowParms::FEATHER)
/* Generate metadata file since csv/feather writers ignore it */
if(parms->format == ArrowParms::CSV || parms->format == ArrowParms::FEATHER)
{
metadataToJson(updated_table, arrowSampler->getMetadataFile());
}
}
catch(const parquet::ParquetStatusException& e)
{
metadataToJson(updated_table, arrowSampler->getMetadataFile());
/* Rethrow as a runtime exception */
throw RunTimeException(CRITICAL, RTE_ERROR, "%s", e.what());
}
}

Expand Down
Loading

0 comments on commit 1b7604e

Please sign in to comment.