diff --git a/contrib/exthdfs/exthdfs.c b/contrib/exthdfs/exthdfs.c index 2c501ece5a..28808a7129 100644 --- a/contrib/exthdfs/exthdfs.c +++ b/contrib/exthdfs/exthdfs.c @@ -37,10 +37,6 @@ Datum hdfsprotocol_validate(PG_FUNCTION_ARGS); Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) { - - // Build the result instance - int nsize = 0; - int numOfBlock = 0; ExtProtocolBlockLocationData *bldata = palloc0(sizeof(ExtProtocolBlockLocationData)); if (bldata == NULL) @@ -66,7 +62,7 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) uri->hostname, uri->port); // Create file system instance - hdfsFS fs = hdfsConnect(uri->hostname, uri->port); + FscHdfsFileSystemC *fs = FscHdfsNewFileSystem(uri->hostname, uri->port); if (fs == NULL) { elog(ERROR, "hdfsprotocol_blocklocation : " @@ -97,35 +93,38 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) // get files contained in the path. - hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize); - if (fiarray == NULL) + FscHdfsFileInfoArrayC *fiarray = FscHdfsDirPath(fs, uri->path); + if (FscHdfsHasErrorRaised(fs)) { - elog(ERROR, "hdfsprotocol_blocklocation : " - "failed to get files of path %s", - uri->path); + Assert(fiarray == NULL); + CatchedError *ce = FscHdfsGetFileSystemError(fs); + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to get files of path %s. %s (%d)", + uri->path, + ce->errMessage, ce->errCode); } - int i = 0 ; // Call block location api to get data location for each file - for (i = 0 ; i < nsize ; i++) + for (int i = 0 ; true ; i++) { - hdfsFileInfo *fi = &fiarray[i]; + FscHdfsFileInfoC *fi = FscHdfsGetFileInfoFromArray(fiarray, i); // break condition of this for loop if (fi == NULL) {break;} // Build file name full path. - const char *fname = fi->mName; - char *fullpath = palloc0( // slash - strlen(fname) + // name - 1); // \0 - sprintf(fullpath, "%s", fname); + const char *fname = FscHdfsGetFileInfoName(fi); + char *fullpath = palloc0(strlen(uri->path) + /* path */ + 1 + /* slash */ + strlen(fname) + /* name */ + 1); /* \0 */ + sprintf(fullpath, "%s/%s", uri->path, fname); elog(DEBUG3, "hdfsprotocol_blocklocation : " "built full path file %s", fullpath); // Get file full length. - int64_t len = fi->mSize; + int64_t len = FscHdfsGetFileInfoLength(fi); elog(DEBUG3, "hdfsprotocol_blocklocation : " "got file %s length " INT64_FORMAT, @@ -137,65 +136,77 @@ Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) } // Get block location data for this file - BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock); - if (bla == NULL) + FscHdfsFileBlockLocationArrayC *bla = + FscHdfsGetPathFileBlockLocation(fs, fullpath, 0, len); + if (FscHdfsHasErrorRaised(fs)) { - elog(ERROR, "hdfsprotocol_blocklocation : " - "failed to get block location of path %s. " - "It is reported generally due to HDFS service errors or " - "another session's ongoing writing.", - fullpath); + Assert(bla == NULL); + CatchedError *ce = FscHdfsGetFileSystemError(fs); + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to get block location of path %s. %s (%d)" + "It is reported generally due to HDFS service errors or " + "another session's ongoing writing.", + fullpath, + ce->errMessage, + ce->errCode); } // Add file full path and its block number as result. blocklocation_file *blf = palloc0(sizeof(blocklocation_file)); blf->file_uri = pstrdup(fullpath); - blf->block_num = numOfBlock; + blf->block_num = FscHdfsGetFileBlockLocationArraySize(bla); blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num); elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks", - fullpath, blf->block_num); + fullpath, blf->block_num); // We don't need it any longer pfree(fullpath); - int bidx = 0; + // Add block information as a list. - for (bidx = 0 ; bidx < blf->block_num ; bidx++) + for (int bidx = 0 ; bidx < blf->block_num ; bidx++) { - BlockLocation *blo = &bla[bidx]; - BlockLocation *bl = &(blf->locations[bidx]); - bl->numOfNodes = blo->numOfNodes; - bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes); - bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes); - bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes); - bl->offset = blo->offset; - bl->length = blo->length; - bl->corrupt = blo->corrupt; - - int nidx = 0 ; - for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++) - { - bl->hosts[nidx] = pstrdup(*blo[nidx].hosts); - bl->names[nidx] = pstrdup(*blo[nidx].names); - bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths); - } + FscHdfsFileBlockLocationC *blo = + FscHdfsGetFileBlockLocationFromArray(bla, bidx); + BlockLocation *bl = &(blf->locations[bidx]); + bl->numOfNodes = FscHdfsGetFileBlockLocationNNodes(blo); + bl->rangeId = -1; + bl->replicaGroupId = -1; + bl->hosts = (char **) palloc0(sizeof(char *) * bl->numOfNodes); + bl->names = (char **) palloc0(sizeof(char *) * bl->numOfNodes); + bl->topologyPaths = (char **) palloc0( + sizeof(char *) * bl->numOfNodes); + bl->offset = FscHdfsGetFileBlockLocationOffset(blo); + bl->length = FscHdfsGetFileBlockLocationLength(blo); + bl->corrupt = FscHdfsGetFileBlockLocationCorrupt(blo); + + for (int nidx = 0; nidx < bl->numOfNodes; nidx++) + { + bl->hosts[nidx] = pstrdup( + FscHdfsGetFileBlockLocationNodeHost(blo, nidx)); + bl->names[nidx] = pstrdup( + FscHdfsGetFileBlockLocationNodeName(blo, nidx)); +// elog (LOG,"the host of bidx %d nidx %d is %s, name is %s.",bidx, nidx, bl->hosts[nidx], bl->names[nidx]); + bl->topologyPaths[nidx] = pstrdup( + FscHdfsGetFileBlockLocationNodeTopoPath(blo, nidx)); + } } bldata->files = lappend(bldata->files, (void *)(blf)); // Clean up block location instances created by the lib. - hdfsFreeFileBlockLocations(bla,numOfBlock); + FscHdfsFreeFileBlockLocationArrayC(&bla); } - // Clean up URI instance in loop as we don't need it any longer + /* Clean up URI instance in loop as we don't need it any longer */ FreeExternalTableUri(uri); - // Clean up file info array created by the lib for this location. - hdfsFreeFileInfo(fiarray,nsize); + /* Clean up file info array created by the lib for this location. */ + FscHdfsFreeFileInfoArrayC(&fiarray); } // destroy fs instance - hdfsDisconnect(fs); + FscHdfsFreeFileSystemC(&fs); PG_RETURN_VOID(); @@ -459,7 +470,7 @@ Datum hdfsprotocol_validate(PG_FUNCTION_ARGS) } /* Clean up temporarily created instances */ - pfree(uri); + FreeExternalTableUri(uri); if (nnaddr != NULL) { pfree(nnaddr);