Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
HAWQ-1766. fix get hdfs file blocklocations
Browse files Browse the repository at this point in the history
  • Loading branch information
Librago committed Sep 17, 2020
1 parent 7dee8e5 commit bc7108d
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions contrib/exthdfs/exthdfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);

Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
{

// Build the result instance
int nsize = 0;
int numOfBlock = 0;
ExtProtocolBlockLocationData *bldata =
palloc0(sizeof(ExtProtocolBlockLocationData));
if (bldata == NULL)
Expand All @@ -66,7 +62,7 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
uri->hostname, uri->port);

// Create file system instance
hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
FscHdfsFileSystemC *fs = FscHdfsNewFileSystem(uri->hostname, uri->port);
if (fs == NULL)
{
elog(ERROR, "hdfsprotocol_blocklocation : "
Expand Down Expand Up @@ -97,35 +93,38 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)


// get files contained in the path.
hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
if (fiarray == NULL)
FscHdfsFileInfoArrayC *fiarray = FscHdfsDirPath(fs, uri->path);
if (FscHdfsHasErrorRaised(fs))
{
elog(ERROR, "hdfsprotocol_blocklocation : "
"failed to get files of path %s",
uri->path);
Assert(fiarray == NULL);
CatchedError *ce = FscHdfsGetFileSystemError(fs);
elog(ERROR, "hdfsprotocol_blocklocation : "
"failed to get files of path %s. %s (%d)",
uri->path,
ce->errMessage, ce->errCode);
}

int i = 0 ;
// Call block location api to get data location for each file
for (i = 0 ; i < nsize ; i++)
for (int i = 0 ; true ; i++)
{
hdfsFileInfo *fi = &fiarray[i];
FscHdfsFileInfoC *fi = FscHdfsGetFileInfoFromArray(fiarray, i);

// break condition of this for loop
if (fi == NULL) {break;}

// Build file name full path.
const char *fname = fi->mName;
char *fullpath = palloc0( // slash
strlen(fname) + // name
1); // \0
sprintf(fullpath, "%s", fname);
const char *fname = FscHdfsGetFileInfoName(fi);
char *fullpath = palloc0(strlen(uri->path) + /* path */
1 + /* slash */
strlen(fname) + /* name */
1); /* \0 */
sprintf(fullpath, "%s/%s", uri->path, fname);

elog(DEBUG3, "hdfsprotocol_blocklocation : "
"built full path file %s", fullpath);

// Get file full length.
int64_t len = fi->mSize;
int64_t len = FscHdfsGetFileInfoLength(fi);

elog(DEBUG3, "hdfsprotocol_blocklocation : "
"got file %s length " INT64_FORMAT,
Expand All @@ -137,65 +136,77 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
}

// Get block location data for this file
BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
if (bla == NULL)
FscHdfsFileBlockLocationArrayC *bla =
FscHdfsGetPathFileBlockLocation(fs, fullpath, 0, len);
if (FscHdfsHasErrorRaised(fs))
{
elog(ERROR, "hdfsprotocol_blocklocation : "
"failed to get block location of path %s. "
"It is reported generally due to HDFS service errors or "
"another session's ongoing writing.",
fullpath);
Assert(bla == NULL);
CatchedError *ce = FscHdfsGetFileSystemError(fs);
elog(ERROR, "hdfsprotocol_blocklocation : "
"failed to get block location of path %s. %s (%d)"
"It is reported generally due to HDFS service errors or "
"another session's ongoing writing.",
fullpath,
ce->errMessage,
ce->errCode);
}

// Add file full path and its block number as result.
blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
blf->file_uri = pstrdup(fullpath);
blf->block_num = numOfBlock;
blf->block_num = FscHdfsGetFileBlockLocationArraySize(bla);
blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);

elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
fullpath, blf->block_num);
fullpath, blf->block_num);

// We don't need it any longer
pfree(fullpath);
int bidx = 0;

// Add block information as a list.
for (bidx = 0 ; bidx < blf->block_num ; bidx++)
for (int bidx = 0 ; bidx < blf->block_num ; bidx++)
{
BlockLocation *blo = &bla[bidx];
BlockLocation *bl = &(blf->locations[bidx]);
bl->numOfNodes = blo->numOfNodes;
bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
bl->offset = blo->offset;
bl->length = blo->length;
bl->corrupt = blo->corrupt;

int nidx = 0 ;
for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
{
bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
bl->names[nidx] = pstrdup(*blo[nidx].names);
bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
}
FscHdfsFileBlockLocationC *blo =
FscHdfsGetFileBlockLocationFromArray(bla, bidx);
BlockLocation *bl = &(blf->locations[bidx]);
bl->numOfNodes = FscHdfsGetFileBlockLocationNNodes(blo);
bl->rangeId = -1;
bl->replicaGroupId = -1;
bl->hosts = (char **) palloc0(sizeof(char *) * bl->numOfNodes);
bl->names = (char **) palloc0(sizeof(char *) * bl->numOfNodes);
bl->topologyPaths = (char **) palloc0(
sizeof(char *) * bl->numOfNodes);
bl->offset = FscHdfsGetFileBlockLocationOffset(blo);
bl->length = FscHdfsGetFileBlockLocationLength(blo);
bl->corrupt = FscHdfsGetFileBlockLocationCorrupt(blo);

for (int nidx = 0; nidx < bl->numOfNodes; nidx++)
{
bl->hosts[nidx] = pstrdup(
FscHdfsGetFileBlockLocationNodeHost(blo, nidx));
bl->names[nidx] = pstrdup(
FscHdfsGetFileBlockLocationNodeName(blo, nidx));
// elog (LOG,"the host of bidx %d nidx %d is %s, name is %s.",bidx, nidx, bl->hosts[nidx], bl->names[nidx]);
bl->topologyPaths[nidx] = pstrdup(
FscHdfsGetFileBlockLocationNodeTopoPath(blo, nidx));
}
}

bldata->files = lappend(bldata->files, (void *)(blf));

// Clean up block location instances created by the lib.
hdfsFreeFileBlockLocations(bla,numOfBlock);
FscHdfsFreeFileBlockLocationArrayC(&bla);
}

// Clean up URI instance in loop as we don't need it any longer
/* Clean up URI instance in loop as we don't need it any longer */
FreeExternalTableUri(uri);

// Clean up file info array created by the lib for this location.
hdfsFreeFileInfo(fiarray,nsize);
/* Clean up file info array created by the lib for this location. */
FscHdfsFreeFileInfoArrayC(&fiarray);
}

// destroy fs instance
hdfsDisconnect(fs);
FscHdfsFreeFileSystemC(&fs);

PG_RETURN_VOID();

Expand Down Expand Up @@ -459,7 +470,7 @@ Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
}

/* Clean up temporarily created instances */
pfree(uri);
FreeExternalTableUri(uri);
if (nnaddr != NULL)
{
pfree(nnaddr);
Expand Down

0 comments on commit bc7108d

Please sign in to comment.