Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consolidator enhancements: throw exception on delete failure, add working directory as argument #44

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions dfs-datastores/src/main/java/com/backtype/hadoop/Consolidator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package com.backtype.hadoop;

import com.backtype.hadoop.formats.RecordInputStream;
import com.backtype.hadoop.formats.RecordOutputStream;
import com.backtype.hadoop.formats.RecordStreamFactory;
Expand All @@ -23,6 +22,7 @@

public class Consolidator {
public static final long DEFAULT_CONSOLIDATION_SIZE = 1024*1024*127; //127 MB
public static final String DEFAULT_WORKING_DIR = "/tmp/consolidator";
private static final String ARGS = "consolidator_args";

private static Thread shutdownHook;
Expand All @@ -35,17 +35,19 @@ public static class ConsolidatorArgs implements Serializable {
public List<String> dirs;
public long targetSizeBytes;
public String extension;

public String workingDir;

public ConsolidatorArgs(String fsUri, RecordStreamFactory streams, PathLister pathLister,
List<String> dirs, long targetSizeBytes, String extension) {
List<String> dirs, long targetSizeBytes, String extension, String workingDir) {
this.fsUri = fsUri;
this.streams = streams;
this.pathLister = pathLister;
this.dirs = dirs;
this.targetSizeBytes = targetSizeBytes;
this.extension = extension;
this.workingDir = workingDir;
}

}

public static void consolidate(FileSystem fs, String targetDir, RecordStreamFactory streams,
Expand All @@ -67,7 +69,7 @@ public static void consolidate(FileSystem fs, String targetDir, RecordStreamFact
PathLister pathLister, long targetSizeBytes, String extension) throws IOException {
List<String> dirs = new ArrayList<String>();
dirs.add(targetDir);
consolidate(fs, streams, pathLister, dirs, targetSizeBytes, extension);
consolidate(fs, streams, pathLister, dirs, targetSizeBytes, extension, DEFAULT_WORKING_DIR);
}

private static String getDirsString(List<String> targetDirs) {
Expand All @@ -82,10 +84,10 @@ private static String getDirsString(List<String> targetDirs) {
}

public static void consolidate(FileSystem fs, RecordStreamFactory streams, PathLister lister, List<String> dirs,
long targetSizeBytes, String extension) throws IOException {
long targetSizeBytes, String extension, String workingDir) throws IOException {
JobConf conf = new JobConf(fs.getConf(), Consolidator.class);
String fsUri = fs.getUri().toString();
ConsolidatorArgs args = new ConsolidatorArgs(fsUri, streams, lister, dirs, targetSizeBytes, extension);
ConsolidatorArgs args = new ConsolidatorArgs(fsUri, streams, lister, dirs, targetSizeBytes, extension, workingDir);
Utils.setObject(conf, ARGS, args);

conf.setJobName("Consolidator: " + getDirsString(dirs));
Expand Down Expand Up @@ -159,7 +161,7 @@ public void map(ArrayWritable sourcesArr, Text target, OutputCollector<NullWrita
//must have failed after succeeding to create file but before task finished - this is valid
//because path is selected with a UUID
if(!fs.exists(finalFile)) {
Path tmpFile = new Path("/tmp/consolidator/" + UUID.randomUUID().toString());
Path tmpFile = new Path(args.workingDir + Path.SEPARATOR + UUID.randomUUID().toString());
fs.mkdirs(tmpFile.getParent());

String status = "Consolidating " + sources.size() + " files into " + tmpFile.toString();
Expand Down Expand Up @@ -195,7 +197,8 @@ public void map(ArrayWritable sourcesArr, Text target, OutputCollector<NullWrita
rprtr.setStatus(status);

for(Path p: sources) {
fs.delete(p, false);
if(!fs.delete(p, false))
throw new IOException("could not delete " + p.toString());
rprtr.progress();
}

Expand Down
31 changes: 13 additions & 18 deletions dfs-datastores/src/main/java/com/backtype/hadoop/pail/Pail.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
package com.backtype.hadoop.pail;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.backtype.hadoop.*;
import com.backtype.hadoop.formats.RecordInputStream;
import com.backtype.hadoop.formats.RecordOutputStream;
import com.backtype.support.Utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.backtype.hadoop.BalancedDistcp;
import com.backtype.hadoop.Coercer;
import com.backtype.hadoop.Consolidator;
import com.backtype.hadoop.PathLister;
import com.backtype.hadoop.RenameMode;
import com.backtype.hadoop.formats.RecordInputStream;
import com.backtype.hadoop.formats.RecordOutputStream;
import com.backtype.support.Utils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;

public class Pail<T> extends AbstractPail implements Iterable<T>{
public static Logger LOG = LoggerFactory.getLogger(Pail.class);
Expand Down Expand Up @@ -510,6 +500,10 @@ public void consolidate() throws IOException {
}

public void consolidate(long maxSize) throws IOException {
consolidate(maxSize, Consolidator.DEFAULT_WORKING_DIR);
}

public void consolidate(long maxSize, String workingDir) throws IOException {
List<String> toCheck = new ArrayList<String>();
toCheck.add("");
PailStructure structure = getSpec().getStructure();
Expand All @@ -535,9 +529,10 @@ public void consolidate(long maxSize) throws IOException {
}
}

Consolidator.consolidate(_fs, _format, new PailPathLister(false), consolidatedirs, maxSize, EXTENSION);
Consolidator.consolidate(_fs, _format, new PailPathLister(false), consolidatedirs, maxSize, EXTENSION, workingDir);
}


@Override
protected RecordInputStream createInputStream(Path path) throws IOException {
return _format.getInputStream(_fs, path);
Expand Down