Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated import and improved error handling for ARPALAZIO #1106

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 61 additions & 69 deletions src/adapters/arpalazio.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
'use strict';

import { REQUEST_TIMEOUT } from '../lib/constants.js';
import {
FetchError,
DATA_URL_ERROR,
DATA_PARSE_ERROR,
} from '../lib/errors.js';
import { FetchError, DATA_PARSE_ERROR } from '../lib/errors.js';
import log from '../lib/logger.js';
import { acceptableParameters, convertUnits } from '../lib/utils.js';
import got from 'got';
import sj from 'scramjet';
import cheerio from 'cheerio';
import { load } from 'cheerio';
import difference from 'lodash/difference.js';
import { DateTime } from 'luxon';

Expand All @@ -35,62 +31,59 @@ const hourlyParameters = difference(

export const name = 'arpalazio';


export async function fetchData (source, cb) {
export async function fetchData(source, cb) {
try {
const stream = await fetchStream(source, cb);
const measurements = await stream.toArray();
cb(null, { name: stream.name, measurements });
} catch (e) {
cb({ message: `fetchData error: ${e.message}` });
log.error(`fetchData error: ${e.message}`);
cb(e);
}
}


async function fetchStream(source, cb) {

const body = await client({ url: source.url, responseType: 'text' });

let $;
async function fetchStream(source) {
try {
$ = cheerio.load(body);
} catch (e) {
throw new FetchError(DATA_PARSE_ERROR, source, e);
const body = await client({
url: source.url,
responseType: 'text',
});
let $ = load(body);
const provinces = $('#provincia option')
.filter((i, el) => Number($(el).attr('value')) >= 0)
.map((i, el) => ({
id: $(el).attr('value'),
name: $(el).text(),
}))
.get();

const out = new MultiStream();
for (const province of provinces) {
const provinceHourlyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${hourlyAvgParam}`;
const provinceDailyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${dailyAvgParam}`;

out.add(
await handleProvince(
province.name,
provinceHourlyURL,
hourlyAvgPeriod,
source
)
);
out.add(
await handleProvince(
province.name,
provinceDailyURL,
dailyAvgPeriod,
source
)
);
}

return out.mux();
} catch (error) {
throw new FetchError(DATA_PARSE_ERROR, source, error);
}

const provinces = $('#provincia option')
.filter(function (i, el) {
return Number($(this).attr('value')) >= 0;
})
.map(function (i, el) {
return { id: $(this).attr('value'), name: $(this).text() };
})
.get();

const out = new MultiStream();
provinces.forEach(async (province) => {
const provinceHourlyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${hourlyAvgParam}`;
const provinceDailyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${dailyAvgParam}`;

out.add(
await handleProvince(
province.name,
provinceHourlyURL,
hourlyAvgPeriod,
source
)
);
out.add(
await handleProvince(
province.name,
provinceDailyURL,
dailyAvgPeriod,
source
)
);
});

return out.mux();
}

const handleProvince = async function (
Expand All @@ -99,30 +92,29 @@ const handleProvince = async function (
averagingPeriod,
source
) {

const body = await client({ url, responseType: 'text' });

const $ = cheerio.load(body);
const pollutantURLs = $('a')
.map(function () {
const pollutant = $(this).text().toLowerCase().replace('.', '');
const currentParameters = getParameters(averagingPeriod);
if (currentParameters.indexOf(pollutant) >= 0) {
const href = $(this).attr('href');
return `${baseUrl}${href}`;
} else {
return null;
}
})
.get();
const $ = load(body);
const pollutantURLs = $('a')
.map(function () {
const pollutant = $(this).text().toLowerCase().replace('.', '');
const currentParameters = getParameters(averagingPeriod);
if (currentParameters.indexOf(pollutant) >= 0) {
const href = $(this).attr('href');
return `${baseUrl}${href}`;
} else {
return null;
}
})
.get();

const arrayOfPromises = pollutantURLs.map((dataUrl) =>
getStream(name, dataUrl, averagingPeriod, source, url)
);

return new MultiStream(
await Promise.all(arrayOfPromises).catch((err) => {
log.verbose(`Promise error ${err}`);
log.error(`Promise error ${err}`);
return arrayOfPromises;
})
).mux();
Expand Down Expand Up @@ -151,7 +143,7 @@ export const getStream = function (
/[\w]{2}_([\w.]{2,})_([\d]{4})(?:_gg)?.txt/
);
if (!match || match.length < 2) {
log.verbose(`Failed to match url ${url}`);
log.error(`Failed to match url ${url}`);
}
const parameter = match[1].toLowerCase().replace('.', '');
const year = match[2];
Expand All @@ -161,7 +153,7 @@ export const getStream = function (
const fewDaysAgo = +parseFloat(
DateTime.local().setZone(timezone).minus({ days: 4 }).ordinal
);
log.verbose(`Fetching data from ${url}`);
log.debug(`Fetching data from ${url}`);

const stations = {};
return StringStream.from(getter.stream(url))
Expand Down
Loading