From 9e1a0a429c3ae7dfa0a373e7b30e2f827e91532b Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Mon, 14 Jun 2021 10:27:51 -0400 Subject: [PATCH] Add tmp output dir --- docs/user/accumulo/commandline.rst | 7 ++- .../ingest/AccumuloBulkIngestCommand.scala | 55 ++++++++++++++----- .../compact/FileSystemCompactionJob.scala | 3 +- .../fs/tools/compact/FsCompactCommand.scala | 7 +-- .../tools/ingest/FileSystemConverterJob.scala | 1 + .../fs/tools/ingest/FsIngestCommand.scala | 19 +++---- geomesa-tools/pom.xml | 12 ++-- .../geomesa/tools/CommonParams.scala | 6 ++ .../tools/utils}/StorageJobUtils.scala | 18 +++--- 9 files changed, 85 insertions(+), 43 deletions(-) rename {geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest => geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils}/StorageJobUtils.scala (88%) diff --git a/docs/user/accumulo/commandline.rst b/docs/user/accumulo/commandline.rst index b6ad4f5e7fa3..b666f597ac9d 100644 --- a/docs/user/accumulo/commandline.rst +++ b/docs/user/accumulo/commandline.rst @@ -122,8 +122,11 @@ Argument Description ``-t, --threads`` Number of parallel threads used ``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc) ```--index`` Specify a particular GeoMesa index to write to, instead of all indices -``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script -``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest) +``--temp-path`` A temporary path to write the output. When using Accumulo on S3, it may be faster to write the + output to HDFS first using this parameter +``--no-tracking`` This application closes when ingest job is submitted. Note that this will require manual import + of the resulting RFiles. +``--run-mode`` Must be ``distributed`` for bulk ingest ``--split-max-size`` Maximum size of a split in bytes (distributed jobs) ``--src-list`` Input files are text files with lists of files, one per line, to ingest ``--skip-import`` Generate the RFiles but skip the bulk import into Accumulo diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala index 6520ec27803f..25f9f444e819 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala @@ -27,10 +27,10 @@ import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback} import org.locationtech.geomesa.tools.DistributedRunParam.RunModes import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode +import org.locationtech.geomesa.tools._ import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs} import org.locationtech.geomesa.tools.ingest._ -import org.locationtech.geomesa.tools.utils.Prompt -import org.locationtech.geomesa.tools.{Command, OptionalCqlFilterParam, OptionalIndexParam, OutputPathParam} +import org.locationtech.geomesa.tools.utils.{Prompt, StorageJobUtils} import org.locationtech.geomesa.utils.index.IndexMode import org.locationtech.geomesa.utils.io.fs.HadoopDelegate.HiddenFileFilter import org.opengis.feature.simple.SimpleFeatureType @@ -66,14 +66,16 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac } } + mode match { case RunModes.Local => throw new IllegalArgumentException("Bulk ingest must be run in distributed mode") case RunModes.Distributed => + val conf = new Configuration() // file output format doesn't let you write to an existing directory val output = new Path(params.outputPath) - val context = FileContext.getFileContext(output.toUri, new Configuration()) + val context = FileContext.getFileContext(output.toUri, conf) if (context.util.exists(output)) { val warning = s"Output directory '$output' exists" if (params.force) { @@ -84,8 +86,20 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac context.delete(output, true) } + val tempPath = Option(params.tempPath).map { temp => + val path = new Path(temp) + // get a new file context as this is likely to be a different filesystem (i.e. hdfs vs s3) + val tempContext = FileContext.getFileContext(path.toUri, conf) + val dir = tempContext.makeQualified(path) + if (tempContext.util.exists(dir)) { + Command.user.info(s"Deleting temp output path $dir") + tempContext.delete(dir, true) + } + dir + } + Command.user.info(s"Running bulk ingestion in distributed ${if (params.combineInputs) "combine " else "" }mode") - new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, params.outputPath, maxSplitSize, + new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize, index, partitions, libjarsFiles, libjarsPaths) case _ => @@ -99,7 +113,8 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac sft: SimpleFeatureType, converterConfig: Config, paths: Seq[String], - output: String, + output: Path, + tempOutput: Option[Path], maxSplitSize: Option[Int], index: Option[String], partitions: Option[Seq[String]], @@ -107,22 +122,40 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac libjarsPaths: Iterator[() => Seq[File]] ) extends ConverterIngestJob(dsParams, sft, converterConfig, paths, libjarsFiles, libjarsPaths) { + private var libjars: String = _ + override def configureJob(job: Job): Unit = { super.configureJob(job) - GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, new Path(output), index, partitions) + val dest = tempOutput.getOrElse(output) + GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, dest, index, partitions) maxSplitSize.foreach { max => job.setInputFormatClass(classOf[ConverterCombineInputFormat]) if (max > 0) { FileInputFormat.setMaxInputSplitSize(job, max.toLong) } } + this.libjars = job.getConfiguration.get("tmpjars") } override def await(reporter: StatusCallback): JobResult = { super.await(reporter).merge { + tempOutput.map { dir => + reporter.reset() + val conf = new Configuration() + conf.set("tmpjars", this.libjars) // copy over out libjars so s3 apis are on the classpath + StorageJobUtils.distCopy(dir, output, reporter, conf) match { + case JobSuccess(message, counts) => + Command.user.info(message) + JobSuccess("", counts) + + case j => j + } + } + }.merge { if (params.skipImport) { Command.user.info("Skipping import of RFiles into Accumulo") - Some(JobSuccess(AccumuloBulkIngestCommand.ImportMessage, Map.empty)) + Command.user.info( + "Files may be imported for each table through the Accumulo shell with the `importdirectory` command") } else { Command.user.info("Importing RFiles into Accumulo") val tableOps = ds.connector.tableOperations() @@ -138,21 +171,17 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac tableOps.importDirectory(path.toString).to(table).load() } } - Some(JobSuccess("", Map.empty)) } + None } } } } object AccumuloBulkIngestCommand { - - private val ImportMessage = - "\nFiles may be imported for each table through the Accumulo shell with the `importdirectory` command" - @Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles") class AccumuloBulkIngestParams extends IngestParams with AccumuloDataStoreParams - with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam { + with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam with TempPathParam { @Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo") var skipImport: Boolean = false } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala index bf728f624d5d..b5abf236ee45 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala @@ -23,14 +23,13 @@ import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType import org.locationtech.geomesa.fs.storage.orc.jobs.OrcStorageConfiguration import org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.CompactionMapper -import org.locationtech.geomesa.fs.tools.ingest.StorageJobUtils import org.locationtech.geomesa.jobs.JobResult.JobSuccess import org.locationtech.geomesa.jobs.mapreduce.GeoMesaOutputFormat.OutputCounters import org.locationtech.geomesa.jobs.mapreduce.JobWithLibJars import org.locationtech.geomesa.jobs.{JobResult, StatusCallback} import org.locationtech.geomesa.parquet.jobs.ParquetStorageConfiguration import org.locationtech.geomesa.tools.Command -import org.locationtech.geomesa.tools.utils.JobRunner +import org.locationtech.geomesa.tools.utils.{JobRunner, StorageJobUtils} import org.locationtech.geomesa.utils.text.TextTools import org.opengis.feature.simple.SimpleFeature diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FsCompactCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FsCompactCommand.scala index d0112e73cb8e..5e2ae96ca93c 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FsCompactCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FsCompactCommand.scala @@ -20,7 +20,6 @@ import org.locationtech.geomesa.fs.tools.FsDataStoreCommand import org.locationtech.geomesa.fs.tools.FsDataStoreCommand.{FsDistributedCommand, FsParams, PartitionParam} import org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.{OrcCompactionJob, ParquetCompactionJob} import org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand -import org.locationtech.geomesa.fs.tools.ingest.FsIngestCommand.TempDirParam import org.locationtech.geomesa.jobs.JobResult.{JobFailure, JobSuccess} import org.locationtech.geomesa.parquet.ParquetFileSystemStorage import org.locationtech.geomesa.tools.Command.CommandException @@ -28,7 +27,7 @@ import org.locationtech.geomesa.tools.DistributedRunParam.RunModes import org.locationtech.geomesa.tools.ingest.IngestCommand import org.locationtech.geomesa.tools.utils.ParameterConverters.BytesConverter import org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress -import org.locationtech.geomesa.tools.{Command, DistributedCommand, DistributedRunParam, RequiredTypeNameParam} +import org.locationtech.geomesa.tools._ import org.locationtech.geomesa.utils.io.PathUtils import org.locationtech.geomesa.utils.text.TextTools @@ -119,7 +118,7 @@ object FsCompactCommand { } else { throw new ParameterException(s"Compaction is not supported for encoding '$encoding'") } - val tempDir = Option(params.tempDir).map(t => new Path(t)) + val tempDir = Option(params.tempPath).map(t => new Path(t)) job.run(storage, toCompact, fileSize, tempDir, libjarsFiles, libjarsPaths, status) match { case JobSuccess(message, counts) => Command.user.info(s"Distributed compaction complete in ${TextTools.getTime(start)}") @@ -137,7 +136,7 @@ object FsCompactCommand { @Parameters(commandDescription = "Compact partitions") class CompactParams extends FsParams - with RequiredTypeNameParam with TempDirParam with PartitionParam with DistributedRunParam { + with RequiredTypeNameParam with TempPathParam with PartitionParam with DistributedRunParam { @Parameter(names = Array("-t", "--threads"), description = "Number of threads if using local mode") var threads: Integer = 4 diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala index 0cdf8df36561..2bff6b9ff642 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala @@ -31,6 +31,7 @@ import org.locationtech.geomesa.parquet.jobs.ParquetStorageConfiguration import org.locationtech.geomesa.tools.Command import org.locationtech.geomesa.tools.ingest.ConverterIngestJob import org.locationtech.geomesa.tools.ingest.IngestCommand.IngestCounters +import org.locationtech.geomesa.tools.utils.StorageJobUtils import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType} abstract class FileSystemConverterJob( diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala index 678acd6de814..fd2373c948d7 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala @@ -23,7 +23,7 @@ import org.locationtech.geomesa.fs.tools.ingest.FsIngestCommand.FsIngestParams import org.locationtech.geomesa.jobs.Awaitable import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat import org.locationtech.geomesa.parquet.ParquetFileSystemStorage -import org.locationtech.geomesa.tools.Command +import org.locationtech.geomesa.tools.{Command, TempPathParam} import org.locationtech.geomesa.tools.DistributedRunParam.RunModes import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs} @@ -53,7 +53,7 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu throw new ParameterException("Please specify --num-reducers for distributed ingest") } val storage = ds.storage(sft.getTypeName) - val tmpPath = Option(params.tempDir).map(d => storage.context.fc.makeQualified(new Path(d))) + val tmpPath = Option(params.tempPath).map(d => storage.context.fc.makeQualified(new Path(d))) val targetFileSize = storage.metadata.get(Metadata.TargetFileSize).map(_.toLong) tmpPath.foreach { tp => @@ -101,16 +101,13 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu } object FsIngestCommand { - @Parameters(commandDescription = "Ingest/convert various file formats into GeoMesa") - class FsIngestParams extends IngestParams with FsParams with OptionalEncodingParam with OptionalSchemeParams with TempDirParam { - @Parameter(names = Array("--num-reducers"), description = "Num reducers (required for distributed ingest)", required = false) + class FsIngestParams extends IngestParams + with FsParams with OptionalEncodingParam with OptionalSchemeParams with TempPathParam { + @Parameter( + names = Array("--num-reducers"), + description = "Num reducers (required for distributed ingest)", + required = false) var reducers: java.lang.Integer = _ } - - trait TempDirParam { - @Parameter(names = Array("--temp-path"), description = "Path to temp dir for writing output. " + - "Note that this may be useful when using s3 since it is slow as a sink", required = false) - var tempDir: String = _ - } } diff --git a/geomesa-tools/pom.xml b/geomesa-tools/pom.xml index 99c0007f9065..ed3dc8d454fd 100644 --- a/geomesa-tools/pom.xml +++ b/geomesa-tools/pom.xml @@ -52,10 +52,6 @@ org.locationtech.geomesa geomesa-fs-storage-parquet_${scala.binary.version} - - org.apache.hadoop - hadoop-client - org.locationtech.geomesa geomesa-filter_2.12 @@ -111,6 +107,14 @@ scala-compiler provided + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-distcp + diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala index e7b6fffd0558..c732adb723ea 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala @@ -272,6 +272,12 @@ trait OutputPathParam { var outputPath: String = _ } +trait TempPathParam { + @Parameter(names = Array("--temp-path"), description = "Path to temp dir for writing output. " + + "Note that this may be useful when using s3 since it is slow as a sink", required = false) + var tempPath: String = _ +} + trait NumReducersParam { @Parameter( names = Array("--num-reducers"), diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/StorageJobUtils.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/StorageJobUtils.scala similarity index 88% rename from geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/StorageJobUtils.scala rename to geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/StorageJobUtils.scala index f350d727b48c..41d1feb1eed5 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/StorageJobUtils.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/StorageJobUtils.scala @@ -6,7 +6,9 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.fs.tools.ingest +package org.locationtech.geomesa.tools.utils + +import java.util.Collections import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration @@ -15,19 +17,20 @@ import org.apache.hadoop.tools.{DistCp, DistCpOptions} import org.locationtech.geomesa.jobs.JobResult.JobSuccess import org.locationtech.geomesa.jobs.{JobResult, StatusCallback} import org.locationtech.geomesa.tools.Command -import org.locationtech.geomesa.tools.utils.JobRunner - -import java.util.Collections object StorageJobUtils extends LazyLogging { - def distCopy(srcRoot: Path, destRoot: Path, statusCallback: StatusCallback): JobResult = { + def distCopy( + srcRoot: Path, + destRoot: Path, + statusCallback: StatusCallback, + conf: Configuration = new Configuration()): JobResult = { statusCallback.reset() Command.user.info("Submitting job 'DistCp' - please wait...") val opts = distCpOptions(srcRoot, destRoot) - val job = new DistCp(new Configuration, opts).execute() + val job = new DistCp(conf, opts).execute() Command.user.info(s"Tracking available at ${job.getStatus.getTrackingUrl}") @@ -41,11 +44,12 @@ object StorageJobUtils extends LazyLogging { // hadoop 3 API private def distCpOptions3(src: Path, dest: Path): DistCpOptions = { - val clas = Class.forName("org.apache.hadoop.tools.DistCpOptions.Builder") + val clas = Class.forName("org.apache.hadoop.tools.DistCpOptions$Builder") val constructor = clas.getConstructor(classOf[java.util.List[Path]], classOf[Path]) val builder = constructor.newInstance(Collections.singletonList(src), dest) clas.getMethod("withAppend", classOf[Boolean]).invoke(builder, java.lang.Boolean.FALSE) clas.getMethod("withOverwrite", classOf[Boolean]).invoke(builder, java.lang.Boolean.TRUE) + clas.getMethod("withBlocking", classOf[Boolean]).invoke(builder, java.lang.Boolean.FALSE) clas.getMethod("withCopyStrategy", classOf[String]).invoke(builder, "dynamic") clas.getMethod("build").invoke(builder).asInstanceOf[DistCpOptions] }