Skip to content

Commit

Permalink
Fix provider status tracking bugs (#112)
Browse files Browse the repository at this point in the history
* Add timeout to grpc calls

* Move snapshot saving logic to providerStatusProvider

* Cap available quantities to 0 to prevent negative numbers

* Cleanup

* Rename function

* Use bulk create for gpus&cpus

* Fetch status of recently online providers first
  • Loading branch information
Redm4x authored Feb 28, 2024
1 parent 66f2870 commit 8f6ab9e
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 276 deletions.
228 changes: 146 additions & 82 deletions indexer/src/providers/providerStatusProvider.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import https from "https";
import axios from "axios";
import semver from "semver";
import { Provider } from "@shared/dbSchemas/akash";
import { Provider, ProviderSnapshotNode, ProviderSnapshotNodeCPU, ProviderSnapshotNodeGPU } from "@shared/dbSchemas/akash";
import { asyncify, eachLimit } from "async";
import { ProviderSnapshot } from "@src/../../shared/dbSchemas/akash/providerSnapshot";
import { fetchAndSaveProviderStats as grpcFetchAndSaveProviderStats } from "./statusEndpointHandlers/grpc";
import { fetchAndSaveProviderStats as restFetchAndSaveProviderStats } from "./statusEndpointHandlers/rest";
import { sequelize } from "@src/db/dbConnection";
import { toUTC } from "@src/shared/utils/date";
import { ProviderStatusInfo, ProviderVersionEndpointResponseType } from "./statusEndpointHandlers/types";
import { isSameDay } from "date-fns";
import { fetchProviderStatusFromGRPC } from "./statusEndpointHandlers/grpc";
import { fetchProviderStatusFromREST } from "./statusEndpointHandlers/rest";

const ConcurrentStatusCall = 10;
const StatusCallTimeout = 10_000; // 10 seconds
Expand All @@ -16,7 +20,10 @@ export async function syncProvidersInfo() {
deletedHeight: null
},
include: [{ model: ProviderSnapshot, as: "lastSnapshot" }],
order: [["isOnline", "DESC"]]
order: [
["isOnline", "DESC"],
["uptime30d", "DESC"]
]
});

const httpsAgent = new https.Agent({
Expand All @@ -28,103 +35,160 @@ export async function syncProvidersInfo() {
providers,
ConcurrentStatusCall,
asyncify(async (provider: Provider) => {
let providerStatus: ProviderStatusInfo | null = null;
let errorMessage: string | null = null;
let akashVersion: string | null = null;
let cosmosVersion: string | null = null;

try {
const versionResponse = await axios.get<ProviderVersionEndpointResponseType>(provider.hostUri + "/version", {
httpsAgent: httpsAgent,
timeout: StatusCallTimeout
});

const akashVersion = semver.valid(versionResponse.data.akash.version);
const cosmosVersion = semver.valid(
akashVersion = semver.valid(versionResponse.data.akash.version);
cosmosVersion = semver.valid(
"cosmosSdkVersion" in versionResponse.data.akash ? versionResponse.data.akash.cosmosSdkVersion : versionResponse.data.akash.cosmos_sdk_version
);

if (akashVersion && semver.gte(akashVersion, "0.5.0-0")) {
await grpcFetchAndSaveProviderStats(provider, cosmosVersion, akashVersion, StatusCallTimeout);
providerStatus = await fetchProviderStatusFromGRPC(provider, StatusCallTimeout);
} else {
await restFetchAndSaveProviderStats(provider, cosmosVersion, akashVersion, StatusCallTimeout);
providerStatus = await fetchProviderStatusFromREST(provider, StatusCallTimeout);
}
} catch (err) {
const checkDate = new Date();
const errorMessage = err?.message?.toString() ?? err?.toString();
errorMessage = err?.message?.toString() ?? err?.toString();
}

await Provider.update(
{
isOnline: false,
lastCheckDate: checkDate,
error: errorMessage,
akashVersion: null,
cosmosSdkVersion: null,
deploymentCount: null,
leaseCount: null,
activeCPU: null,
activeGPU: null,
activeMemory: null,
activeStorage: null,
pendingCPU: null,
pendingGPU: null,
pendingMemory: null,
pendingStorage: null,
availableCPU: null,
availableGPU: null,
availableMemory: null,
availableStorage: null
},
{
where: { owner: provider.owner }
}
);
await saveProviderStatus(provider, providerStatus, akashVersion, cosmosVersion, errorMessage);

await ProviderSnapshot.create({
owner: provider.owner,
isOnline: false,
error: errorMessage,
checkDate: checkDate
});
} finally {
doneCount++;
console.log("Fetched provider info: " + doneCount + " / " + providers.length);
}
doneCount++;
console.log("Fetched provider info: " + doneCount + " / " + providers.length);
})
);

console.log("Finished refreshing provider infos");
}

type ProviderVersionEndpointResponseType =
| {
akash: { version: string; commit: string; buildTags: string; go: string; cosmosSdkVersion: string };
kube: {
major: string;
minor: string;
gitVersion: string;
gitCommit: string;
gitTreeState: string;
buildDate: string;
goVersion: string;
compiler: string;
platform: string;
};
async function saveProviderStatus(
provider: Provider,
providerStatus: ProviderStatusInfo | null,
akashVersion: string | null,
cosmosVersion: string | null,
error: string | null
) {
await sequelize.transaction(async (t) => {
const checkDate = toUTC(new Date());

const createdSnapshot = await ProviderSnapshot.create(
{
owner: provider.owner,
isOnline: !!providerStatus,
isLastOfDay: true,
error: error,
checkDate: checkDate,
deploymentCount: providerStatus?.resources.deploymentCount,
leaseCount: providerStatus?.resources.leaseCount,
activeCPU: providerStatus?.resources.activeCPU,
activeGPU: providerStatus?.resources.activeGPU,
activeMemory: providerStatus?.resources.activeMemory,
activeStorage: providerStatus?.resources.activeStorage,
pendingCPU: providerStatus?.resources.pendingCPU,
pendingGPU: providerStatus?.resources.pendingGPU,
pendingMemory: providerStatus?.resources.pendingMemory,
pendingStorage: providerStatus?.resources.pendingStorage,
availableCPU: providerStatus?.resources.availableCPU,
availableGPU: providerStatus?.resources.availableGPU,
availableMemory: providerStatus?.resources.availableMemory,
availableStorage: providerStatus?.resources.availableStorage
},
{ transaction: t }
);

if (provider.lastSnapshot && isSameDay(provider.lastSnapshot.checkDate, checkDate)) {
await ProviderSnapshot.update(
{
isLastOfDay: false
},
{
where: { id: provider.lastSnapshot.id },
transaction: t
}
);
}
| {
akash: {
name: string;
server_name: string;
version: string;
commit: string;
build_tags: string;
go: string;
cosmos_sdk_version: string;
};
kube: {
major: string;
minor: string;
gitVersion: string;
gitCommit: string;
gitTreeState: string;
buildDate: string;
goVersion: string;
compiler: string;
platform: string;
};
};

await Provider.update(
{
lastSnapshotId: createdSnapshot.id,
isOnline: !!providerStatus,
error: error,
lastCheckDate: checkDate,
cosmosSdkVersion: cosmosVersion,
akashVersion: akashVersion,
deploymentCount: providerStatus?.resources.deploymentCount,
leaseCount: providerStatus?.resources.leaseCount,
activeCPU: providerStatus?.resources.activeCPU,
activeGPU: providerStatus?.resources.activeGPU,
activeMemory: providerStatus?.resources.activeMemory,
activeStorage: providerStatus?.resources.activeStorage,
pendingCPU: providerStatus?.resources.pendingCPU,
pendingGPU: providerStatus?.resources.pendingGPU,
pendingMemory: providerStatus?.resources.pendingMemory,
pendingStorage: providerStatus?.resources.pendingStorage,
availableCPU: providerStatus?.resources.availableCPU,
availableGPU: providerStatus?.resources.availableGPU,
availableMemory: providerStatus?.resources.availableMemory,
availableStorage: providerStatus?.resources.availableStorage
},
{
where: { owner: provider.owner },
transaction: t
}
);

if (providerStatus) {
for (const node of providerStatus.nodes) {
const providerSnapshotNode = await ProviderSnapshotNode.create(
{
snapshotId: createdSnapshot.id,
name: node.name,
cpuAllocatable: node.cpuAllocatable,
cpuAllocated: node.cpuAllocated,
memoryAllocatable: node.memoryAllocatable,
memoryAllocated: node.memoryAllocated,
ephemeralStorageAllocatable: node.ephemeralStorageAllocatable,
ephemeralStorageAllocated: node.ephemeralStorageAllocated,
capabilitiesStorageHDD: node.capabilitiesStorageHDD,
capabilitiesStorageSSD: node.capabilitiesStorageSSD,
capabilitiesStorageNVME: node.capabilitiesStorageNVME,
gpuAllocatable: node.gpuAllocatable,
gpuAllocated: node.gpuAllocated
},
{ transaction: t }
);

await ProviderSnapshotNodeCPU.bulkCreate(
node.cpus.map((cpuInfo) => ({
snapshotNodeId: providerSnapshotNode.id,
vendor: cpuInfo.vendor,
model: cpuInfo.model,
vcores: cpuInfo.vcores
})),
{ transaction: t }
);

await ProviderSnapshotNodeGPU.bulkCreate(
node.gpus.map((gpuInfo) => ({
snapshotNodeId: providerSnapshotNode.id,
vendor: gpuInfo.vendor,
name: gpuInfo.name,
modelId: gpuInfo.modelId,
interface: gpuInfo.interface,
memorySize: gpuInfo.memorySize
})),
{ transaction: t }
);
}
}
});
}
Loading

0 comments on commit 8f6ab9e

Please sign in to comment.