Skip to content

Commit

Permalink
Add support for setting DFDL external variables
Browse files Browse the repository at this point in the history
The name/value of NiFi dynamic properties are now treated as DFDL
external variables. Property values are allowed to be NiFi expressions,
which are evaluated and set as the variable values. If a property value
evaluates to the empty string, it is ignored--this helps when a NiFi
expression determines that a variable does not apply for a schema, since
DFDL requires that all variables passed in externally must be valid for
the schema. This does mean it is not possible to have a variable with
the value of an empty string, but this should be rare and worked around.

This also refactored exceptions, so that functions throw correct
exceptions instead of turning everything into an IOException, which
worked but is not technically correct.

Also added a new "External Variables" section to additional details
pages documening this capability, and cleaned up the additional details
pages.
  • Loading branch information
stevedlawrence committed Jan 29, 2024
1 parent 34c582a commit 67ca394
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
Expand All @@ -28,7 +29,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -37,8 +40,10 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -61,6 +66,7 @@
import org.apache.daffodil.japi.Daffodil;
import org.apache.daffodil.japi.DataProcessor;
import org.apache.daffodil.japi.Diagnostic;
import org.apache.daffodil.japi.ExternalVariableException;
import org.apache.daffodil.japi.ProcessorFactory;
import org.apache.daffodil.japi.WithDiagnostics;
import org.apache.daffodil.japi.ValidationMode;
Expand Down Expand Up @@ -261,7 +267,7 @@ public boolean equals(Object obj) {
* happen outside of this function, as those changes will not be cached and will need to
* be done for every flow file, which could have performance implications.
*/
DataProcessor newDataProcessor(ComponentLog logger) throws IOException {
DataProcessor newDataProcessor(ComponentLog logger) throws DaffodilCompileException {

// Try to find the schema to compile or reload. If dfdlSchema is a file that exists,
// we just use that. If dfdlSchema is not a file, try to find it on the classpath,
Expand All @@ -273,7 +279,11 @@ DataProcessor newDataProcessor(ComponentLog logger) throws IOException {
URL schemaURL = null;
File f = new File(this.dfdlSchema);
if (f.isFile()) {
schemaURL = f.toURI().toURL();
try {
schemaURL = f.toURI().toURL();
} catch (MalformedURLException e) {
throw new DaffodilCompileException("Invalid 'DFDL Schema File' property: " + e);
}
} else {
// it is important to use getClassLoader.getResource() here. If we just do
// getClass.getResource() then Java will prepend the classes package to
Expand All @@ -296,7 +306,7 @@ DataProcessor newDataProcessor(ComponentLog logger) throws IOException {
dp = c.reload(rbc);
rbc.close();
is.close();
} catch (InvalidParserException e) {
} catch (InvalidParserException|IOException e) {
logger.error("Failed to reload pre-compiled DFDL schema: " + this.dfdlSchema + ". " + e.getMessage());
throw new DaffodilCompileException("Failed to reload pre-compiled DFDL schema: " + this.dfdlSchema + ". " + e.getMessage());
}
Expand All @@ -314,25 +324,27 @@ DataProcessor newDataProcessor(ComponentLog logger) throws IOException {
AbstractDaffodilProcessor.logDiagnostics(logger, dp);
throw new DaffodilCompileException("Failed to compile DFDL schema: " + this.dfdlSchema);
}
} catch (IOException e) {
throw new DaffodilCompileException("Failed to compile DFDL schema: " + this.dfdlSchema, e);
} catch (URISyntaxException e) {
throw new AssertionError("invalid URI should no be possible: " + e);
}
}
try {
dp = dp.withValidationMode(this.validationMode);
} catch (InvalidUsageException e) {
throw new IOException(e);
throw new AssertionError("invalid usage of Daffodil API: " + e);
}
return dp;
}
}

protected DataProcessor getDataProcessor(CompilationParams params) throws IOException {
protected DataProcessor getDataProcessor(CompilationParams params) throws DaffodilCompileException {
if (cache != null) {
try {
return cache.get(params);
} catch (ExecutionException e) {
throw new IOException(e);
throw new DaffodilCompileException(e);
}
} else {
return params.newDataProcessor(getLogger());
Expand All @@ -349,6 +361,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
// all dynamic properties are treated as variables, with the property name/value set as the variable
// name/value. NiFi expressions are allowed to support dynamic variable values, such as from flow file
// attributes. We do not need a validator, since a variable value could theoretically contain
// anyting--we rely on Daffodil to check the variable value according to its variable type when we
// provide the variables at parse/unparse time.
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.dynamic(true)
.build();
}


@OnScheduled
public void onScheduled(final ProcessContext context) {
final ComponentLog logger = getLogger();
Expand All @@ -363,7 +391,7 @@ public void onScheduled(final ProcessContext context) {

cache = cacheBuilder.build(
new CacheLoader<CompilationParams, DataProcessor>() {
public DataProcessor load(CompilationParams params) throws IOException {
public DataProcessor load(CompilationParams params) throws DaffodilCompileException {
return params.newDataProcessor(logger);
}
});
Expand Down Expand Up @@ -413,33 +441,68 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
infosetType = infosetTypeValue;
}



try {
// Get the DataProcessor, likely from a cache of already compiled data processors. The only change
// that should happen to the cached DataProcessor is setting variables specific to the flowfile.
// We don't cache DataProcessor with variables preset because variables are expressions and could
// change per flow file. Also, assigning external variables is pretty efficient so not worth
// caching.
final DataProcessor cachedDP;
final DataProcessor dpForProcessing;
try {
cachedDP = getDataProcessor(params);
} catch (DaffodilCompileException e) {
throw new ProcessException(e);
}

// Treat dynamic properties as variables. If the value of the variable is the empty
// string (or an expression that evaluates to the empty string), the dynamic property is
// ignored and is not added as a variable. This supports expressions that determine that
// a variable does not apply to a schema and to ignore it, since all varibles passed to
// withExternalVariables must be valid for that schema.
final LinkedHashMap<String, String> variableMap = new LinkedHashMap<>();
for (final PropertyDescriptor pd : context.getProperties().keySet()) {
if (pd.isDynamic()) {
final String value = context.getProperty(pd.getName()).evaluateAttributeExpressions(original).getValue();
if (!value.isEmpty()) {
variableMap.put(pd.getName(), value);
}
}
}

if (variableMap.isEmpty()) {
dpForProcessing = cachedDP;
} else {
try {
dpForProcessing = cachedDP.withExternalVariables(variableMap);
} catch (ExternalVariableException ex) {
throw new ProcessException("variables not valid for schema: " + ex.getMessage());
}
}

FlowFile output = session.write(original, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
// Get the DataProcessor, likely from a cache of already compiled data
// processors. Note that no changes to the DataProcessor should happen. Any
// changes to a DataProcessor should happen in the CompilationParams
// newDataProcessor() function using only the parameters passed to the
// CompilationParams constructor
DataProcessor dp = getDataProcessor(params);

// Parse or unparse the flow file, reading from he input stream and writing
// Parse or unparse the flow file, reading from the input stream and writing
// to the output stream
processWithDaffodil(dp, original, in, out, infosetType);
processWithDaffodil(dpForProcessing, original, in, out, infosetType);
}
});

final String outputMimeType = getOutputMimeType(infosetType);
if (outputMimeType != null) {
output = session.putAttribute(output, CoreAttributes.MIME_TYPE.key(), outputMimeType);
} else {
output = session.removeAttribute(output, CoreAttributes.MIME_TYPE.key());
}

session.transfer(output, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(output, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.debug("Processed {}", new Object[]{original});
} catch (ProcessException e) {
logger.error("Failed to process {} due to {}", new Object[]{original, e});
logger.error("Failed to process {} due to {}", new Object[]{original, e.getMessage()});
session.transfer(original, REL_FAILURE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.io.IOException;

public class DaffodilCompileException extends IOException {
public class DaffodilCompileException extends Exception {

public DaffodilCompileException() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.nio.channels.ReadableByteChannel;

import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;

Expand All @@ -51,6 +53,12 @@
@Tags({"xml", "json", "daffodil", "dfdl", "schema", "xsd"})
@CapabilityDescription("Use Daffodil and a user-specified DFDL schema to transform data to an infoset, represented by either XML or JSON.")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json or application/xml based on the infoset type.")
@DynamicProperty(
name = "Name of external variable defined in a DFDL schema",
value = "Value to set for the DFDL external variable. May be an expression. The DFDL variable is not set if the value expression evaluates to an empty string.",
description = "Defines an external variable to be used when parsing",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
@RequiresInstanceClassLoading
public class DaffodilParse extends AbstractDaffodilProcessor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.nio.channels.WritableByteChannel;

import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;

Expand All @@ -48,6 +50,12 @@
@Tags({"xml", "json", "daffodil", "dfdl", "schema", "xsd"})
@CapabilityDescription("Use Daffodil and a user-specified DFDL schema to transform an XML or JSON representation of data back to the original data format.")
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unparsed, this attriute is removed, as the MIME Type is no longer known.")
@DynamicProperty(
name = "Name of external variable defined in a DFDL schema",
value = "Value to set for the DFDL external variable. May be an expression. The DFDL variable is not set if the value expression evaluates to an empty string.",
description = "Defines an external variable to be used when parsing",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
@RequiresInstanceClassLoading
public class DaffodilUnparse extends AbstractDaffodilProcessor {

Expand Down
Loading

0 comments on commit 67ca394

Please sign in to comment.