-
Notifications
You must be signed in to change notification settings - Fork 58
Filter Pipes
A common Pipes pattern is the filtering of objects. A filter-based pipe will consume objects and either emit them or not. If they are not emitted, then they are considered filtered by the pipe. A useful interface to implement that describes this behavior is FilterPipe<S> extends Pipe<S,S>
.
The generic filter pipe is FilterFunctionPipe
. A FilterFunctionPipe
takes a PipeFunction
(see Pipe Types) that computes on S
and either emits it or not. An example PipeFunction
is provided below:
public class CharCountPipeFunction implements PipeFunction<String,Boolean> {
private final int number;
public CharCountPipeFunction(int number) {
this.number = number;
}
public Boolean compute(String argument) {
return argument.length() == this.number;
}
}
When put in the context of a FilterFunctionPipe
, the code looks as follows:
Pipe<String, String> pipe = new FilterFunctionPipe<String>(new CharCountPipeFunction(4));
pipe.setStarts(Arrays.asList("tell", "me", "your", "name"));
// the results of the iteration are: "tell", "your", "name"
The RandomFilterPipe
comes with the Pipes distribution. RandomFilterPipe
will only allow a consumed object to be emitted if a biased coin toss lands on “heads.” At the extremes, if bias
is 0.0 then no incoming objects are emitted and if bias
is 1.0, then every incoming object is emitted.
public class RandomFilterPipe<S> extends AbstractPipe<S, S> implements FilterPipe<S> {
private static final Random RANDOM = new Random();
private final double bias;
public SampleFilterPipe(final double bias) {
this.bias = bias;
}
protected S processNextStart() {
while (this.starts.hasNext()) {
S s = this.starts.next();
if (bias >= RANDOM.nextDouble()) {
return s;
}
}
throw new NoSuchElementException();
}
}
The processNextStart()
method structure above is a common pattern in filter-based pipes. In short, a while(this.starts.hasNext())
is evaluated until an incoming start object meets the criteria and is returned (i.e. emitted). If there are no more starts and the criteria is not met, then a NoSuchElementException
is thrown. Beware of using recursion to accomplish a similar behavior. As the amount of data grows that is streaming through, recursion-based methods can easily yield a StackOverflowError
.
More complicated filters can be created than what was presented above. These filters usually check objects to determine whether to emit them or not. There is an interface called ComparisonFilterPipe<S,T>
. Implementations of this interface consume objects of type S
. If a check of that object passes (i.e. return true
), then the S
object is emitted, else it is filtered. ComparisonFilterPipe
has the following signature:
public interface ComparisonFilterPipe<S, T> extends FilterPipe<S> {
public enum Filter {
EQUAL, NOT_EQUAL, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL
}
public boolean compareObjects(T leftObject, T rightObject);
}
The important method is compareObjects()
. This method returns true
if the left hand object is EQUAL
, NOT_EQUAL
, etc. to the right hand object.
Next, there is an abstract class called AbstractComparisonFilterPipe<S,T>
that implements ComparisonFilterPipe
. More specifically, it provides a standard implementation of ComparisonFilterPipe.compareObjects()
. For most situations, this method is sufficient. However, if not, simply override the method with an implementation that meets the requirements of the designed pipe.
public boolean compareObjects(final T leftObject, final T rightObject) {
switch (this.filter) {
case EQUAL:
if (null == leftObject)
return rightObject == null;
return leftObject.equals(rightObject);
case NOT_EQUAL:
if (null == leftObject)
return rightObject != null;
return !leftObject.equals(rightObject);
case GREATER_THAN:
if (null == leftObject || rightObject == null)
return false;
return ((Comparable) leftObject).compareTo(rightObject) == 1;
case LESS_THAN:
if (null == leftObject || rightObject == null)
return false;
return ((Comparable) leftObject).compareTo(rightObject) == -1;
case GREATER_THAN_EQUAL:
if (null == leftObject || rightObject == null)
return false;
return ((Comparable) leftObject).compareTo(rightObject) >= 0;
case LESS_THAN_EQUAL:
if (null == leftObject || rightObject == null)
return false;
return ((Comparable) leftObject).compareTo(rightObject) <= 0;
default:
throw new RuntimeException("Invalid state as no valid filter was provided");
}
}
Note: it many situations, such as ObjectFilterPipe
, the right hand object to allow or disallow is stored in the pipe and compared with each object passed through it.