Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Increase function time and process document chunks in parallel to finish processing large documents within 10 minites (function timeout duration). #3

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions DocumentVectorPipelineFunctions/BlobTriggerFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ public class BlobTriggerFunction(
private static readonly int DefaultDimensions = 1536;

private const int MaxRetryCount = 100;
private const int RetryDelay = 10 * 1000; // 100 seconds
private const int RetryDelay = 10 * 1000; // 10 seconds

private const int MaxBatchSize = 10;
private const int MaxDegreeOfParallelism = 50;

private const int MaxBatchSize = 100;
private int embeddingDimensions = DefaultDimensions;

[Function("BlobTriggerFunction")]
Expand Down Expand Up @@ -66,32 +68,48 @@ private async Task HandleBlobCreateEventAsync(BlobClient blobClient)
var result = operation.Value;
this._logger.LogInformation("Extracted content from '{name}', # pages {pageCount}", blobClient.Name, result.Pages.Count);

var textChunks = TextChunker.FixedSizeChunking(result);

var listOfBatches = new List<List<TextChunk>>();

int totalChunksCount = 0;
var batchChunkTexts = new List<TextChunk>(MaxBatchSize);
foreach (var chunk in TextChunker.FixedSizeChunking(result))
for (int i = 0; i <= textChunks.Count(); i++)
{
batchChunkTexts.Add(chunk);
if (i == textChunks.Count())
{
if (batchChunkTexts.Count() > 0)
{
listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
}
batchChunkTexts.Clear();

break;
}

batchChunkTexts.Add(textChunks.ElementAt(i));
totalChunksCount++;

if (batchChunkTexts.Count >= MaxBatchSize)
{
await this.ProcessCurrentBatchAsync(blobClient, cosmosDBClientWrapper, batchChunkTexts);
listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
batchChunkTexts.Clear();
}
}

// Process any remaining documents in last batch
if (batchChunkTexts.Count > 0)
this._logger.LogInformation("Processing list of batches in parallel, total batches: {listSize}, chunks count: {chunksCount}", listOfBatches.Count(), totalChunksCount);
await Parallel.ForEachAsync(listOfBatches, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, async (batchChunkText, cancellationToken) =>
{
await this.ProcessCurrentBatchAsync(blobClient, cosmosDBClientWrapper, batchChunkTexts);
}
this._logger.LogInformation("Processing batch of size: {batchSize}", batchChunkText.Count());
await this.ProcessCurrentBatchAsync(blobClient, cosmosDBClientWrapper, batchChunkText);
});

this._logger.LogInformation("Finished processing blob {name}, total chunks processed {count}.", blobClient.Name, totalChunksCount);
}

private async Task ProcessCurrentBatchAsync(BlobClient blobClient, CosmosDBClientWrapper cosmosDBClientWrapper, List<TextChunk> batchChunkTexts)
{
this._logger.LogInformation("Generating embeddings for : '{count}'.", batchChunkTexts.Count());
this._logger.LogInformation("Generating embeddings for batch of size: '{size}'.", batchChunkTexts.Count());
var embeddings = await this.GenerateEmbeddingsWithRetryAsync(batchChunkTexts);

this._logger.LogInformation("Creating Cosmos DB documents for batch of size {count}", batchChunkTexts.Count);
Expand Down
4 changes: 4 additions & 0 deletions deployment/functionapp.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ resource funcApp 'Microsoft.Web/sites@2023-12-01' = {
name: 'AzureOpenAIModelDimensions'
value: modelDimensions
}
{
name: 'AzureFunctionsJobHost__functionTimeout'
value: '00:10:00'
}
]
}
}
Expand Down
2 changes: 1 addition & 1 deletion deployment/main.bicepparam
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ param open_ai_deployments = [
name: modelDeployment
sku: {
name: 'Standard'
capacity: 50
capacity: 100
}
model: {
name: modelDeployment
Expand Down