Skip to content

Commit

Permalink
First submit all discovered data before waiting for results
Browse files Browse the repository at this point in the history
  • Loading branch information
Fried Hoeben committed Nov 25, 2024
1 parent 195e10d commit e12b38b
Showing 1 changed file with 71 additions and 24 deletions.
95 changes: 71 additions & 24 deletions lansweeper/aws/integration-lambda/lansweeper_integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,17 @@ class LansweeperIntegration {
if (selectedInstallations.length > 1) {
console.log('Will process the following installations: %j', selectedInstallations.map(i => i.name));
}
for (const installation of selectedInstallations) {
const installationResult = await this.processSite(siteId,
networkedAssetsOnly,
generateLabels,
installation,
installationNames,
assetTypes);
result.uploadCounts[siteName] = result.uploadCounts[siteName] || {};
result.uploadCounts[siteName][installation.name] = installationResult.uploadCount;
if (installationResult.errors) {
result.errorCounts[siteName] = result.errorCounts[siteName] || {};
result.errorCounts[siteName][installation.name] = installationResult.errors.length;
result.errors[siteName] = result.errors[siteName] || {};
result.errors[siteName][installation.name] = installationResult.errors;
}
const resultDownloadFunctions =
await this.enqueueMutationsForInstallations(selectedInstallations,
siteId,
networkedAssetsOnly,
generateLabels,
installationNames,
assetTypes);
if (resultDownloadFunctions.size > 0) {
console.log('Assets from all installations uploaded');
await this.downloadAndReduceSiteResults(resultDownloadFunctions, result, siteName);
console.log('All results gathered');
}
}
}
Expand All @@ -109,17 +105,68 @@ class LansweeperIntegration {
return this.resultHelper.cleanupResult(result);
}

async enqueueMutationsForInstallations(selectedInstallations, siteId, networkedAssetsOnly, generateLabels, installationNames, assetTypes) {
const resultDownloadFunctions = new Map();
for (const installation of selectedInstallations) {
const resultDownloadFunction =
await this.enqueueMutationsForInstallation(siteId,
networkedAssetsOnly,
generateLabels,
installation,
installationNames,
assetTypes);
resultDownloadFunctions.set(installation, resultDownloadFunction);
}
return resultDownloadFunctions;
}

async downloadAndReduceSiteResults(resultDownloadFunctions, result, siteName) {
const downloadCalls =
Array.from(resultDownloadFunctions,
async ([installation, resultDownloadFunction]) => {
const installationResult = await resultDownloadFunction.call();
return await this.mergeInstallationResult(result, siteName, installation, installationResult);
});
console.log('Waiting for all asyncQuery results to become available');
await Promise.all(downloadCalls);
}

async mergeInstallationResult(result, siteName, installation, installationResult) {
result.uploadCounts[siteName] = result.uploadCounts[siteName] || {};
result.uploadCounts[siteName][installation.name] = installationResult.uploadCount;
if (installationResult.errors) {
result.errorCounts[siteName] = result.errorCounts[siteName] || {};
result.errorCounts[siteName][installation.name] = installationResult.errors.length;
result.errors[siteName] = result.errors[siteName] || {};
result.errors[siteName][installation.name] = installationResult.errors;
}
}

async processSite(siteId, networkedAssetsOnly, generateLabels, installation, installationNames, assetTypes) {
const resultDownloadFunction = await this.enqueueMutationsForInstallation(siteId, networkedAssetsOnly, generateLabels, installation, installationNames, assetTypes);
return await resultDownloadFunction.call();
}

async enqueueMutationsForInstallation(siteId, networkedAssetsOnly, generateLabels, installation, installationNames, assetTypes) {
const siteName = await this.lansweeperClient.getSiteName(siteId);
const installationName = installation.name;
console.log(`processing site ${siteName}, installation ${installationName}. NetworkedAssetsOnly: ${networkedAssetsOnly}.${generateLabels ? ' Using asset name as label.' : ''}`);
const itemsHandler = async items => await this.sendAssetsTo4me(items, networkedAssetsOnly, generateLabels, installationName, installationNames);
const sendResults = await this.lansweeperClient.getAssetsPaged(siteId, this.assetSeenCutOffDate(), itemsHandler, networkedAssetsOnly, installation.id, assetTypes);
const jsonResults = await this.downloadResults(sendResults.map(r => r.mutationResult));
const overallResult = this.reduceResults(sendResults, jsonResults);
console.log(`processed site ${siteName}, installation ${installationName}. Upload count: ${overallResult.uploadCount}, error count: ${overallResult.errors ? overallResult.errors.length : 0}`);
const sendResults = await this.startInstallationSync(siteId, networkedAssetsOnly, generateLabels, installation, installationName, installationNames, assetTypes);
console.log(`Uploaded all assets for site ${siteName}, installation ${installationName}.`);

// function to download all results for this installation and merge to single result
return async () => {
const jsonResults = await this.downloadResults(installationName, sendResults.map(r => r.mutationResult));
const overallResult = this.reduceResults(sendResults, jsonResults);
console.log(`processed site ${siteName}, installation ${installationName}. Upload count: ${overallResult.uploadCount}, error count: ${overallResult.errors ? overallResult.errors.length : 0}`);

return overallResult;
return overallResult;
};
}

async startInstallationSync(siteId, networkedAssetsOnly, generateLabels, installation, installationName, installationNames, assetTypes) {
const itemsHandler = async items => await this.sendAssetsTo4me(items, networkedAssetsOnly, generateLabels, installationName, installationNames);
return await this.lansweeperClient.getAssetsPaged(siteId, this.assetSeenCutOffDate(), itemsHandler, networkedAssetsOnly, installation.id, assetTypes);
}

async sendAssetsTo4me(assets, networkedAssetsOnly = false, generateLabels = false, installation = null, installations = []) {
Expand Down Expand Up @@ -169,12 +216,12 @@ class LansweeperIntegration {
return new Date(new TimeHelper().getMsSinceEpoch() - LansweeperIntegration.LAST_SEEN_DAYS * 24 * 60 * 60 * 1000);
}

async downloadResults(mutationResultsToRetrieve) {
async downloadResults(installationName, mutationResultsToRetrieve) {
const jsonResults = new Map();
if (mutationResultsToRetrieve.length === 0) {
console.log('No asynchronous queries results to retrieve');
console.log(`No asynchronous queries results to retrieve for ${installationName}`);
} else {
console.log('Downloading all asynchronous query results');
console.log(`Downloading ${mutationResultsToRetrieve.length} asynchronous query results for ${installationName}`);
const jsonRetrievalCalls = mutationResultsToRetrieve
.filter(r => !!r)
.map(async r => jsonResults.set(r, await this.downloadResult(r)));
Expand Down

0 comments on commit e12b38b

Please sign in to comment.