Skip to content

Commit

Permalink
Add tmp output dir
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jun 14, 2021
1 parent 0270a0c commit 9e1a0a4
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 43 deletions.
7 changes: 5 additions & 2 deletions docs/user/accumulo/commandline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ Argument Description
``-t, --threads`` Number of parallel threads used
``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc)
```--index`` Specify a particular GeoMesa index to write to, instead of all indices
``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script
``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest)
``--temp-path`` A temporary path to write the output. When using Accumulo on S3, it may be faster to write the
output to HDFS first using this parameter
``--no-tracking`` This application closes when ingest job is submitted. Note that this will require manual import
of the resulting RFiles.
``--run-mode`` Must be ``distributed`` for bulk ingest
``--split-max-size`` Maximum size of a split in bytes (distributed jobs)
``--src-list`` Input files are text files with lists of files, one per line, to ingest
``--skip-import`` Generate the RFiles but skip the bulk import into Accumulo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat
import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback}
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
import org.locationtech.geomesa.tools._
import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs}
import org.locationtech.geomesa.tools.ingest._
import org.locationtech.geomesa.tools.utils.Prompt
import org.locationtech.geomesa.tools.{Command, OptionalCqlFilterParam, OptionalIndexParam, OutputPathParam}
import org.locationtech.geomesa.tools.utils.{Prompt, StorageJobUtils}
import org.locationtech.geomesa.utils.index.IndexMode
import org.locationtech.geomesa.utils.io.fs.HadoopDelegate.HiddenFileFilter
import org.opengis.feature.simple.SimpleFeatureType
Expand Down Expand Up @@ -66,14 +66,16 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac
}
}


mode match {
case RunModes.Local =>
throw new IllegalArgumentException("Bulk ingest must be run in distributed mode")

case RunModes.Distributed =>
val conf = new Configuration()
// file output format doesn't let you write to an existing directory
val output = new Path(params.outputPath)
val context = FileContext.getFileContext(output.toUri, new Configuration())
val context = FileContext.getFileContext(output.toUri, conf)
if (context.util.exists(output)) {
val warning = s"Output directory '$output' exists"
if (params.force) {
Expand All @@ -84,8 +86,20 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac
context.delete(output, true)
}

val tempPath = Option(params.tempPath).map { temp =>
val path = new Path(temp)
// get a new file context as this is likely to be a different filesystem (i.e. hdfs vs s3)
val tempContext = FileContext.getFileContext(path.toUri, conf)
val dir = tempContext.makeQualified(path)
if (tempContext.util.exists(dir)) {
Command.user.info(s"Deleting temp output path $dir")
tempContext.delete(dir, true)
}
dir
}

Command.user.info(s"Running bulk ingestion in distributed ${if (params.combineInputs) "combine " else "" }mode")
new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, params.outputPath, maxSplitSize,
new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize,
index, partitions, libjarsFiles, libjarsPaths)

case _ =>
Expand All @@ -99,30 +113,49 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac
sft: SimpleFeatureType,
converterConfig: Config,
paths: Seq[String],
output: String,
output: Path,
tempOutput: Option[Path],
maxSplitSize: Option[Int],
index: Option[String],
partitions: Option[Seq[String]],
libjarsFiles: Seq[String],
libjarsPaths: Iterator[() => Seq[File]]
) extends ConverterIngestJob(dsParams, sft, converterConfig, paths, libjarsFiles, libjarsPaths) {

private var libjars: String = _

override def configureJob(job: Job): Unit = {
super.configureJob(job)
GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, new Path(output), index, partitions)
val dest = tempOutput.getOrElse(output)
GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, dest, index, partitions)
maxSplitSize.foreach { max =>
job.setInputFormatClass(classOf[ConverterCombineInputFormat])
if (max > 0) {
FileInputFormat.setMaxInputSplitSize(job, max.toLong)
}
}
this.libjars = job.getConfiguration.get("tmpjars")
}

override def await(reporter: StatusCallback): JobResult = {
super.await(reporter).merge {
tempOutput.map { dir =>
reporter.reset()
val conf = new Configuration()
conf.set("tmpjars", this.libjars) // copy over out libjars so s3 apis are on the classpath
StorageJobUtils.distCopy(dir, output, reporter, conf) match {
case JobSuccess(message, counts) =>
Command.user.info(message)
JobSuccess("", counts)

case j => j
}
}
}.merge {
if (params.skipImport) {
Command.user.info("Skipping import of RFiles into Accumulo")
Some(JobSuccess(AccumuloBulkIngestCommand.ImportMessage, Map.empty))
Command.user.info(
"Files may be imported for each table through the Accumulo shell with the `importdirectory` command")
} else {
Command.user.info("Importing RFiles into Accumulo")
val tableOps = ds.connector.tableOperations()
Expand All @@ -138,21 +171,17 @@ class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with Ac
tableOps.importDirectory(path.toString).to(table).load()
}
}
Some(JobSuccess("", Map.empty))
}
None
}
}
}
}

object AccumuloBulkIngestCommand {

private val ImportMessage =
"\nFiles may be imported for each table through the Accumulo shell with the `importdirectory` command"

@Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles")
class AccumuloBulkIngestParams extends IngestParams with AccumuloDataStoreParams
with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam {
with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam with TempPathParam {
@Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo")
var skipImport: Boolean = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
import org.locationtech.geomesa.fs.storage.orc.jobs.OrcStorageConfiguration
import org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.CompactionMapper
import org.locationtech.geomesa.fs.tools.ingest.StorageJobUtils
import org.locationtech.geomesa.jobs.JobResult.JobSuccess
import org.locationtech.geomesa.jobs.mapreduce.GeoMesaOutputFormat.OutputCounters
import org.locationtech.geomesa.jobs.mapreduce.JobWithLibJars
import org.locationtech.geomesa.jobs.{JobResult, StatusCallback}
import org.locationtech.geomesa.parquet.jobs.ParquetStorageConfiguration
import org.locationtech.geomesa.tools.Command
import org.locationtech.geomesa.tools.utils.JobRunner
import org.locationtech.geomesa.tools.utils.{JobRunner, StorageJobUtils}
import org.locationtech.geomesa.utils.text.TextTools
import org.opengis.feature.simple.SimpleFeature

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import org.locationtech.geomesa.fs.tools.FsDataStoreCommand
import org.locationtech.geomesa.fs.tools.FsDataStoreCommand.{FsDistributedCommand, FsParams, PartitionParam}
import org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.{OrcCompactionJob, ParquetCompactionJob}
import org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand
import org.locationtech.geomesa.fs.tools.ingest.FsIngestCommand.TempDirParam
import org.locationtech.geomesa.jobs.JobResult.{JobFailure, JobSuccess}
import org.locationtech.geomesa.parquet.ParquetFileSystemStorage
import org.locationtech.geomesa.tools.Command.CommandException
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
import org.locationtech.geomesa.tools.ingest.IngestCommand
import org.locationtech.geomesa.tools.utils.ParameterConverters.BytesConverter
import org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress
import org.locationtech.geomesa.tools.{Command, DistributedCommand, DistributedRunParam, RequiredTypeNameParam}
import org.locationtech.geomesa.tools._
import org.locationtech.geomesa.utils.io.PathUtils
import org.locationtech.geomesa.utils.text.TextTools

Expand Down Expand Up @@ -119,7 +118,7 @@ object FsCompactCommand {
} else {
throw new ParameterException(s"Compaction is not supported for encoding '$encoding'")
}
val tempDir = Option(params.tempDir).map(t => new Path(t))
val tempDir = Option(params.tempPath).map(t => new Path(t))
job.run(storage, toCompact, fileSize, tempDir, libjarsFiles, libjarsPaths, status) match {
case JobSuccess(message, counts) =>
Command.user.info(s"Distributed compaction complete in ${TextTools.getTime(start)}")
Expand All @@ -137,7 +136,7 @@ object FsCompactCommand {

@Parameters(commandDescription = "Compact partitions")
class CompactParams extends FsParams
with RequiredTypeNameParam with TempDirParam with PartitionParam with DistributedRunParam {
with RequiredTypeNameParam with TempPathParam with PartitionParam with DistributedRunParam {

@Parameter(names = Array("-t", "--threads"), description = "Number of threads if using local mode")
var threads: Integer = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.locationtech.geomesa.parquet.jobs.ParquetStorageConfiguration
import org.locationtech.geomesa.tools.Command
import org.locationtech.geomesa.tools.ingest.ConverterIngestJob
import org.locationtech.geomesa.tools.ingest.IngestCommand.IngestCounters
import org.locationtech.geomesa.tools.utils.StorageJobUtils
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

abstract class FileSystemConverterJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.locationtech.geomesa.fs.tools.ingest.FsIngestCommand.FsIngestParams
import org.locationtech.geomesa.jobs.Awaitable
import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat
import org.locationtech.geomesa.parquet.ParquetFileSystemStorage
import org.locationtech.geomesa.tools.Command
import org.locationtech.geomesa.tools.{Command, TempPathParam}
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs}
Expand Down Expand Up @@ -53,7 +53,7 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu
throw new ParameterException("Please specify --num-reducers for distributed ingest")
}
val storage = ds.storage(sft.getTypeName)
val tmpPath = Option(params.tempDir).map(d => storage.context.fc.makeQualified(new Path(d)))
val tmpPath = Option(params.tempPath).map(d => storage.context.fc.makeQualified(new Path(d)))
val targetFileSize = storage.metadata.get(Metadata.TargetFileSize).map(_.toLong)

tmpPath.foreach { tp =>
Expand Down Expand Up @@ -101,16 +101,13 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu
}

object FsIngestCommand {

@Parameters(commandDescription = "Ingest/convert various file formats into GeoMesa")
class FsIngestParams extends IngestParams with FsParams with OptionalEncodingParam with OptionalSchemeParams with TempDirParam {
@Parameter(names = Array("--num-reducers"), description = "Num reducers (required for distributed ingest)", required = false)
class FsIngestParams extends IngestParams
with FsParams with OptionalEncodingParam with OptionalSchemeParams with TempPathParam {
@Parameter(
names = Array("--num-reducers"),
description = "Num reducers (required for distributed ingest)",
required = false)
var reducers: java.lang.Integer = _
}

trait TempDirParam {
@Parameter(names = Array("--temp-path"), description = "Path to temp dir for writing output. " +
"Note that this may be useful when using s3 since it is slow as a sink", required = false)
var tempDir: String = _
}
}
12 changes: 8 additions & 4 deletions geomesa-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-fs-storage-parquet_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-filter_2.12</artifactId>
Expand Down Expand Up @@ -111,6 +107,14 @@
<artifactId>scala-compiler</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ trait OutputPathParam {
var outputPath: String = _
}

trait TempPathParam {
@Parameter(names = Array("--temp-path"), description = "Path to temp dir for writing output. " +
"Note that this may be useful when using s3 since it is slow as a sink", required = false)
var tempPath: String = _
}

trait NumReducersParam {
@Parameter(
names = Array("--num-reducers"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.tools.ingest
package org.locationtech.geomesa.tools.utils

import java.util.Collections

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
Expand All @@ -15,19 +17,20 @@ import org.apache.hadoop.tools.{DistCp, DistCpOptions}
import org.locationtech.geomesa.jobs.JobResult.JobSuccess
import org.locationtech.geomesa.jobs.{JobResult, StatusCallback}
import org.locationtech.geomesa.tools.Command
import org.locationtech.geomesa.tools.utils.JobRunner

import java.util.Collections

object StorageJobUtils extends LazyLogging {

def distCopy(srcRoot: Path, destRoot: Path, statusCallback: StatusCallback): JobResult = {
def distCopy(
srcRoot: Path,
destRoot: Path,
statusCallback: StatusCallback,
conf: Configuration = new Configuration()): JobResult = {
statusCallback.reset()

Command.user.info("Submitting job 'DistCp' - please wait...")

val opts = distCpOptions(srcRoot, destRoot)
val job = new DistCp(new Configuration, opts).execute()
val job = new DistCp(conf, opts).execute()

Command.user.info(s"Tracking available at ${job.getStatus.getTrackingUrl}")

Expand All @@ -41,11 +44,12 @@ object StorageJobUtils extends LazyLogging {

// hadoop 3 API
private def distCpOptions3(src: Path, dest: Path): DistCpOptions = {
val clas = Class.forName("org.apache.hadoop.tools.DistCpOptions.Builder")
val clas = Class.forName("org.apache.hadoop.tools.DistCpOptions$Builder")
val constructor = clas.getConstructor(classOf[java.util.List[Path]], classOf[Path])
val builder = constructor.newInstance(Collections.singletonList(src), dest)
clas.getMethod("withAppend", classOf[Boolean]).invoke(builder, java.lang.Boolean.FALSE)
clas.getMethod("withOverwrite", classOf[Boolean]).invoke(builder, java.lang.Boolean.TRUE)
clas.getMethod("withBlocking", classOf[Boolean]).invoke(builder, java.lang.Boolean.FALSE)
clas.getMethod("withCopyStrategy", classOf[String]).invoke(builder, "dynamic")
clas.getMethod("build").invoke(builder).asInstanceOf[DistCpOptions]
}
Expand Down

0 comments on commit 9e1a0a4

Please sign in to comment.