Skip to content

Commit

Permalink
Merge pull request #2 from civitaspo/v0.0.2
Browse files Browse the repository at this point in the history
V0.0.2
  • Loading branch information
civitaspo committed Dec 9, 2015
2 parents a23dafc + b53cc00 commit 206e501
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.0.2 (2015-12-09)
==================

- Fix a bug: when the distinct key includes null, this plugin did not guarantee the distinctness.
- Add debug log: the filtered key like `Duplicated key: [value1, value2, value3]`

0.0.1 (2015-12-08)
==================

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ configurations {
provided
}

version = "0.0.1"
version = "0.0.2"
sourceCompatibility = 1.7
targetCompatibility = 1.7

Expand Down
54 changes: 33 additions & 21 deletions src/main/java/org/embulk/filter/distinct/FilteredPageOutput.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.embulk.filter.distinct;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ObjectArrays;
Expand Down Expand Up @@ -49,7 +50,7 @@ public void add(Page page)
pageReader.setPage(page);

while (pageReader.nextRecord()) {
if (filter.add(getCurrentDistinctKey())) {
if (isDistinct(getCurrentValues())) {
outputSchema.visitColumns(visitor);
pageBuilder.addRecord();
}
Expand All @@ -69,32 +70,43 @@ public void close()
pageBuilder.close();
}

private List<Object> getCurrentDistinctKey()
private List<Object> getCurrentValues()
{
ImmutableList.Builder<Object> builder = ImmutableList.builder();
for (Column distinctColumn : distinctColumns) {
if (!pageReader.isNull(distinctColumn)) {
if (Types.BOOLEAN.equals(distinctColumn.getType())) {
builder.add(pageReader.getBoolean(distinctColumn));
}
else if (Types.DOUBLE.equals(distinctColumn.getType())) {
builder.add(pageReader.getDouble(distinctColumn));
}
else if (Types.LONG.equals(distinctColumn.getType())) {
builder.add(pageReader.getLong(distinctColumn));
}
else if (Types.STRING.equals(distinctColumn.getType())) {
builder.add(pageReader.getString(distinctColumn));
}
else if (Types.TIMESTAMP.equals(distinctColumn.getType())) {
builder.add(pageReader.getTimestamp(distinctColumn));
}
else {
throw new RuntimeException("unsupported type: " + distinctColumn.getType());
}
if (pageReader.isNull(distinctColumn)) {
builder.add(Optional.absent());
}
else if (Types.BOOLEAN.equals(distinctColumn.getType())) {
builder.add(pageReader.getBoolean(distinctColumn));
}
else if (Types.DOUBLE.equals(distinctColumn.getType())) {
builder.add(pageReader.getDouble(distinctColumn));
}
else if (Types.LONG.equals(distinctColumn.getType())) {
builder.add(pageReader.getLong(distinctColumn));
}
else if (Types.STRING.equals(distinctColumn.getType())) {
builder.add(pageReader.getString(distinctColumn));
}
else if (Types.TIMESTAMP.equals(distinctColumn.getType())) {
builder.add(pageReader.getTimestamp(distinctColumn));
}
else {
throw new RuntimeException("unsupported type: " + distinctColumn.getType());
}
}

return builder.build();
}

private boolean isDistinct(List<Object> key) {
if (filter.add(key)) {
return true;
}
else {
logger.debug("Duplicated key: {}", key);
return false;
}
}
}

0 comments on commit 206e501

Please sign in to comment.