Skip to content

Commit

Permalink
feat: weighted gateway peers for data retrievals
Browse files Browse the repository at this point in the history
  * adding 'peer' label to prometheus data retrival metrics
  * add lib helper 'random-weighted-choices' for returning weighted
    random list
  * add retry mechanism, retrying over random and unique gateway peers
  • Loading branch information
hlolli committed Jan 27, 2025
1 parent 2325880 commit 97be845
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 57 deletions.
146 changes: 92 additions & 54 deletions src/data/ar-io-data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
import { default as axios, AxiosResponse } from 'axios';
import winston from 'winston';
import {
WeightedElement,
randomWeightedChoices,
} from '../lib/random-weighted-choices.js';
import { AoARIORead } from '@ar.io/sdk';
import { randomInt } from 'node:crypto';

Expand Down Expand Up @@ -47,6 +51,8 @@ export class ArIODataSource implements ContiguousDataSource {
peers: Record<string, string> = {};
private intervalId?: NodeJS.Timeout;

protected weightedPeers: WeightedElement<string>[] = [];

constructor({
log,
arIO,
Expand Down Expand Up @@ -121,6 +127,15 @@ export class ArIODataSource implements ContiguousDataSource {
peers: Object.keys(peers),
});
this.peers = peers;
this.weightedPeers = Object.keys(peers).map((id) => {
const previousWeight =
this.weightedPeers.find((peer) => peer.id === id)?.weight ?? undefined;
return {
id,
// the weight system is a bit arbitrary being between 0 and 100, 50 is the default neutral
weight: previousWeight === undefined ? 50 : previousWeight,
};
});
}

selectPeer(): string {
Expand All @@ -136,6 +151,43 @@ export class ArIODataSource implements ContiguousDataSource {
return this.peers[keys[randomIndex]];
}

selectRandomWeightedPeers(count: number): string[] {
const log = this.log.child({ method: 'selectRandomWeightedPeers' });

if (this.weightedPeers.length === 0) {
log.warn('No weighted peers available');
throw new Error('No weighted peers available');
}

return randomWeightedChoices<string>({ table: this.weightedPeers, count });
}

handlePeerSuccess(peer: string): void {
metrics.getDataStreamSuccessesTotal.inc({
class: this.constructor.name,
peer,
});
// warm the succeeding peer
this.weightedPeers.forEach((weightedPeer) => {
if (weightedPeer.id === peer) {
weightedPeer.weight = Math.min(weightedPeer.weight + 0.1, 100);
}
});
}

handlePeerFailure(peer: string): void {
metrics.getDataStreamErrorsTotal.inc({
class: this.constructor.name,
peer,
});
// cool the failing peer
this.weightedPeers.forEach((weightedPeer) => {
if (weightedPeer.id === peer) {
weightedPeer.weight = Math.max(weightedPeer.weight - 0.1, 1);
}
});
}

private async request({
peerAddress,
id,
Expand Down Expand Up @@ -167,15 +219,20 @@ export class ArIODataSource implements ContiguousDataSource {
id,
requestAttributes,
region,
retryCount,
}: {
id: string;
requestAttributes?: RequestAttributes;
region?: Region;
retryCount?: number;
}): Promise<ContiguousData> {
const log = this.log.child({ method: 'getData' });
const totalRetryCount =
retryCount ?? Math.max(Object.keys(this.peers).length, 1);

log.debug('Fetching contiguous data from ArIO peer', {
id,
totalRetryCount,
});

if (requestAttributes !== undefined) {
Expand All @@ -185,86 +242,67 @@ export class ArIODataSource implements ContiguousDataSource {
}
}

let selectedPeer = this.selectPeer();
const randomPeers = this.selectRandomWeightedPeers(totalRetryCount);

const requestAttributesHeaders =
generateRequestAttributes(requestAttributes);

try {
const response = await this.request({
peerAddress: selectedPeer,
id,
headers: {
...(requestAttributesHeaders?.headers || {}),
...(region
? {
Range: `bytes=${region.offset}-${region.offset + region.size - 1}`,
}
: {}),
},
});

const parsedRequestAttributes = parseRequestAttributesHeaders({
headers: response.headers as { [key: string]: string },
currentHops: requestAttributesHeaders?.attributes.hops,
});

return this.parseResponse(response, parsedRequestAttributes);
} catch (error: any) {
metrics.getDataErrorsTotal.inc({
class: 'ArIODataSource',
});
log.error('Failed to fetch contiguous data from first random ArIO peer', {
message: error.message,
stack: error.stack,
});

for (const currentPeer of randomPeers) {
try {
selectedPeer = this.selectPeer();
const response = await this.request({
peerAddress: selectedPeer,
peerAddress: currentPeer,
id,
headers: requestAttributesHeaders?.headers || {},
headers: {
...(requestAttributesHeaders?.headers || {}),
...(region
? {
Range: `bytes=${region.offset}-${region.offset + region.size - 1}`,
}
: {}),
},
});

const parsedRequestAttributes = parseRequestAttributesHeaders({
headers: response.headers as { [key: string]: string },
currentHops: requestAttributesHeaders?.attributes.hops,
});

return this.parseResponse(response, parsedRequestAttributes);
return this.parseResponse({
response,
requestAttributes: parsedRequestAttributes,
peer: currentPeer,
});
} catch (error: any) {
metrics.getDataErrorsTotal.inc({
class: this.constructor.name,
class: 'ArIODataSource',
});
log.error('Failed to fetch contiguous data from ArIO peer', {
currentPeer,
message: error.message,
stack: error.stack,
});
log.error(
'Failed to fetch contiguous data from second random ArIO peer',
{
message: error.message,
stack: error.stack,
},
);
throw new Error('Failed to fetch contiguous data from ArIO peers');
}
}

throw new Error('Failed to fetch contiguous data from any peer');
}

private parseResponse(
response: AxiosResponse,
requestAttributes: RequestAttributes,
): ContiguousData {
private parseResponse({
response,
requestAttributes,
peer,
}: {
response: AxiosResponse;
requestAttributes: RequestAttributes;
peer: string;
}): ContiguousData {
const stream = response.data;

stream.on('error', () => {
metrics.getDataStreamErrorsTotal.inc({
class: this.constructor.name,
});
this.handlePeerFailure(peer);
});

stream.on('end', () => {
metrics.getDataStreamSuccessesTotal.inc({
class: this.constructor.name,
});
this.handlePeerSuccess(peer);
});

return {
Expand Down
95 changes: 95 additions & 0 deletions src/lib/random-weighted-choices.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

// note: this file is based on the library random-weighted-choice
// but it differs in that it allows for multiple choices to be made at once

export type WeightedElement<T> = {
id: T;
weight: number;
};

type RandomWeightedChoicesArgs<T> = {
table: WeightedElement<T>[];
count?: number; // Number of IDs to return, default is 1
temperature?: number; // Default is 50
randomFunction?: () => number; // Default is Math.random
influence?: number; // Default is 2
};

export const randomWeightedChoices = <T>({
table,
count = 1,
temperature = 50,
randomFunction = Math.random,
influence = 2,
}: RandomWeightedChoicesArgs<T>): T[] => {
const T = (temperature - 50) / 50;
const nb = table.length;
if (!nb) {
return [];
}

const total = table.reduce(
(previousTotal, element) => previousTotal + element.weight,
0,
);

const avg = total / nb;

// Compute amplified urgencies (depending on temperature)
const urgencies: Record<string, number> = {};
const urgencySum = table.reduce((previousSum, element) => {
const { id, weight } = element;
let urgency = weight + T * influence * (avg - weight);
if (urgency < 0) urgency = 0;
urgencies[id as string] = (urgencies[id as string] || 0) + urgency;
return previousSum + urgency;
}, 0);

let currentUrgency = 0;
const cumulatedUrgencies: Record<string, number> = {};
Object.keys(urgencies).forEach((id) => {
currentUrgency += urgencies[id];
cumulatedUrgencies[id] = currentUrgency;
});

if (urgencySum <= 0) {
return []; // No weight given
}

// Choose
const results: T[] = [];
for (let i = 0; i < count; i++) {
const choice = randomFunction() * urgencySum;
const ids = Object.keys(cumulatedUrgencies).filter(
// prevent duplicates
(id) => !results.includes(id as unknown as T),
);
for (let j = 0; j < ids.length; j++) {
const id = ids[j];
const urgency = cumulatedUrgencies[id];
if (choice <= urgency) {
results.push(id as unknown as T);
break;
}
}
}

return results;
};
6 changes: 3 additions & 3 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,19 @@ export const arnsResolutionTime = new promClient.Summary({
export const getDataErrorsTotal = new promClient.Counter({
name: 'get_data_errors_total',
help: 'Count of requests errors',
labelNames: ['class'],
labelNames: ['class', 'peer'],
});

export const getDataStreamErrorsTotal = new promClient.Counter({
name: 'get_data_stream_errors_total',
help: 'Count of data stream errors',
labelNames: ['class'],
labelNames: ['class', 'peer'],
});

export const getDataStreamSuccessesTotal = new promClient.Counter({
name: 'get_data_stream_successes_total',
help: 'Count of data stream successes',
labelNames: ['class'],
labelNames: ['class', 'peer'],
});

//
Expand Down

0 comments on commit 97be845

Please sign in to comment.