This sample outlines two ways to accomplish the following set of requirements using Azure Functions. One way uses the "traditional" serverless approach, and the other uses Azure Functions' new Durable Functions feature.
Given a set of customers, assume each customer uploads data to our backend for historical record keeping and analysis. This data arrives in the form of a set of .csv
files with each file containing different data. Think of them almost as SQL Table dumps in CSV format.
When the customer uploads the files, we have two primary objectives:
- Ensure that all the files required for the customer are present for a particular "set" (aka "batch") of data
- Only when we have all the files for a set, continue on to validate the structure of each file ensuring a handful of requirements:
- Each file must be UTF-8 encoded
- Depending on the file (type1, type2, etc), ensure the correct # of columns are present in the CSV file
To accomplish this sample, you'll need to set up a few things:
- Azure General Purpose Storage
- For the Functions SDK to store its dashboard info, and the Durable Functions to store their state data
- Azure Blob Storage
- For the customer files to be uploaded in to
- Azure Event Grid (with Storage Events)
- ngrok to enable local Azure Function triggering from Event Grid (see this blog post for more)
- Visual Studio 2017 v15.5.4+ with the Azure Workload installed.
- The Azure Functions and Web Jobs Tools extension to VS, version 15.0.40108+
- Azure Storage Explorer (makes testing easier)
Pull down the code.
Create a new file in the AzureFunctions.v2
project called local.settings.json
with the following content:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "<general purpose storage connection string>",
"AzureWebJobsDashboard": "<general purpose storage connection string>",
"CustomerBlobStorage": "<blob storage connection string>",
"ValidateFunctionUrl": "http://localhost:7071/api/Validate"
}
}
This file will be used across the functions, durable or otherwise.
Next, run any of the Function apps in this solution. You can use the v1 (.Net Framework) or the v2 (.Net Core) version, it's only needed for Event Grid validation.
With the function running, add an Event Grid Subscription to the Blob Storage account (from step 2), pointing to the ngrok-piped endpoint you created in step 4. The URL should look something like this: https://b3252cc3.ngrok.io/api/Orchestrator
Upon saving this subscription, you'll see your locally-running Function get hit with a request and return HTTP OK, then the Subscription will go green in Azure and you're set.
Now, open Azure Storage Explorer and connect to the Blob Storage Account you've created. In here, create a container named cust1
. Inside the container, create a new folder called inbound
.
Take one of the .csv
files from the sampledata
folder of this repo, and drop it in to the inbound folder.
If you're using one of the Durable functions, you should see your local function's /api/Orchestrator
endpoint get hit.
- Determine the "batch prefix" of the file that was dropped. This consists of the customer name (cust1), and a datetime stamp in the format YYYYMMDD_HHMM, making the batch prefix for the first batch in
sampledata
defined ascust1_20171010_1112
- Check to see if a sub-orchestration for this batch already exists.
- If not, spin one up and pass along the Event Grid data that triggered this execution
- If so, use
RaiseEvent
to pass the filename along to the instance.
In the EnsureAllFiles
sub-orchestration, we look up what files we need for this customer (cust1) and check to see which files have come through thus far. As long as we do not have the files we need, we loop within the orchestration. Each time waiting for an external newfile
event to be thrown to let us know a new file has come through and should be processed.
When we find we have all the files that constitute a "batch" for the customer, we call the ValidateFileSet
activity function to process each file in the set and validate the structure of them according to our rules.
When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
Because of the persistent behavior of state for Durable Functions, if you need to reset the execution because something goes wrong it's not as simple as just re-running the function. To do this properly, you must:
- Delete the
DurableFunctionsHubHistory
Table in the "General Purpose" Storage Account you created in Step 1 above. - Delete any files you uploaded to the
/inbound
directory of the blob storage container triggering the Functions.
Note: after doing these steps you'll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
- Determine the "batch prefix" of the file that was dropped. This consists of the customer name (cust1), and a datetime stamp in the format YYYYMMDD_HHMM, making the batch prefix for the first batch in
sampledata
defined ascust1_20171010_1112
- Check to see if we have all necessary files in blob storage with this prefix.
- If we do, check to see if there's a lock entry in the
FileProcessingLocks
table of the General Purpose Storage Account containing this prefix. If so, bail. If not, create one, then call theValidateFunctionUrl
endpoint with the batch prefix as payload. - The Validate function gets the request & checks to see if the lock is marked as 'in progress'. If so, bail. If not, mark it as such and continue validating the files in the Blob Storage account which match the prefix passed in.
When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
- Delete the
FileProcessingLocks
table from the General Purpose Storage Account. - Delete any files you uploaded to the
/inbound
directory of the blob storage container triggering the Functions.
Note: after doing these steps you'll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
While not identically behaved, this repo also contains deployment scripts for two Logic App instances which perform roughly the same flow.
This LA gets Storage Events from event grid, pulls off the full prefix of the file (also containing the URL), and sends this on to...
This receives events from the Processor and waits for 3 containing the same prefix to arrive before sending the batch on to the next step (you can change this to be whatever you want after deployment)
- The
400 BAD REQUEST
return if errors are found in the set suffers from this bug on the Functions v2 runtime as of this writing.
- If you drop all the files in at once, there exists a race condition when the events fired from Event Grid hit the top-level Orchestrator endpoint; it doesn't execute
StartNewAsync
fast enough and instead of one instance per batch, you'll end up with multiple instances for the same prefix (even though we desire one instance per, acting like a singleton).