diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/404.html b/404.html new file mode 100644 index 0000000..198e776 --- /dev/null +++ b/404.html @@ -0,0 +1,769 @@ + + + +
+ + + + + + + + + + + + + + +Clone the airbyte repository
+git@github.com:airbytehq/airbyte.git
+
We will follow roughly The Exchange Rates connector tutorial and the cdk python info.
+The cookiecutter-ish connector dev template is started like so:
+cd airbyte-integrations/connector-templates/generator
+./generate.sh
+
It will ask for a connector type (ptyhon http api in our case) and a name. npm
must be available.
airbyte connector acceptance tests require >=python3.9
+As a script
+# from airbyte-integrations/connectors/source-<name>
+python main.py spec
+python main.py check --config secrets/config.json
+python main.py discover --config secrets/config.json
+python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
+
As airbyte would do it
+# First build the container
+docker build . -t airbyte/source-<name>:dev
+
+# Then use the following commands to run it
+docker run --rm airbyte/source-<name>:dev spec
+docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-<name>:dev check --config /secrets/config.json
+docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-<name>:dev discover --config /secrets/config.json
+docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-<name>:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
+
Then proceed to edit the airbyte-integrations/connectors/source-datadis/source_datadis/spec.yaml
with the fields you'd want to show in the config. In our case, username and password.
We also added the pypi datadis dependency in the setup.py of our connector.
+run with python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
First the spec.yaml
is the configuration page, and what will be available with config['']
in the source.py, nothing more, nothing less. It is filled with the secrets/config.json
.
What's more rellevant is the configured_catalog.json
which defines how data records are delivered and the settings page of the source.
You'll need a configured_catalog.json specifying the output schema. If you make it match with the api response we won't need to do any parsing.
+{
+ "streams": [{
+ "stream": {
+ "name": "get_consumption_data",
+ "source_defined_cursor": false,
+ "json_schema": {
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "properties": {
+ "cups": {
+ "type": "string"
+ },
+ "date": {
+ "type": "string",
+ "format": "date"
+ },
+ "time": {
+ "type": "string",
+ "airbyte_type": "time_without_timezone"
+ },
+ "consumptionKWh": {
+ "type": "number"
+ },
+ "obtainMethod": {
+ "type": "string"
+ }
+ }
+ },
+ "supported_sync_modes": ["full_refresh"]
+ },
+ "sync_mode": "full_refresh",
+ "destination_sync_mode": "append"
+ }]
+}
+
The params are set in the source settings, which is inconvenient because it means creating N sources per params settings. So, you'd be wondering, but startDate will change! The airbyte way of handling querying historical data is configuring an incremental read. That way the params would change accordingly.
+Also, we will need to query many CUPS. Maybe datadis allows an array as cups param, it does refer to them in plural. Another option might be asyncio-ing the requests but airbyte doesn't seem to be thought like that. One source/connection per CUPS at UI level seems a bad idea. Another option is programatically call/create the airbyte tasks, given a bunch of secrets.
+Airbyte facilitates pagination and other goodies, but datadis doesn't offer pagination on the non-agregated data :/
+source.py +
class DatadisStream(HttpStream, ABC):
+ # TODO: Fill in the url base. Required.
+ url_base = "https://datadis.es/api-private/api/"
+
+ max_retries = 0
+
+ # default strategy reads one record and then the rest, but datadis doesn't allow repeated requests with the same params :unamused:
+ availability_strategy = None
+
+ def __init__(self, config: Mapping[str, Any], **kwargs):
+ super().__init__()
+ self.username = config['username']
+ self.password = config['password']
+ self.cups = config['cups']
+ self.token = None
+
+ def path(
+ self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
+ ) -> str:
+ return "get-consumption-data"
+
+ def request_headers(
+ self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
+ ) -> Mapping[str, Any]:
+ # The api requires that we include apikey as a header so we do that in this method
+ if not self.token:
+ username = self.username
+ password = self.password
+ self.token = asyncio.run(get_token(username, password))
+ print('refreshed token')
+
+ return {"Authorization": "Bearer "+self.token}
+
+ def request_params(
+ self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
+ ) -> MutableMapping[str, Any]:
+ """
+ Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
+ Usually contains common params e.g. pagination size etc.
+
+ We hardcoded the startDate-endDate but it would have to be set in the secrets/config.json and incrementaled instead of full-refresh as this is.
+ """
+ return {'cups': self.cups, 'distributorCode':2, 'measurementType':0, 'pointType':5, 'startDate': '2022/11', 'endDate': '2023/03', }
+
+ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
+ """
+ Override this method to define how a response is parsed.
+ :return an iterable containing each record in the response
+
+ if the json matches the catalog, you don't need any tranformation. If datadis supported pagination you could yield results I guess?
+ """
+
+ return response.json()
+
If everything goes well you'll get something like this:
+{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceDatadis"}}
+{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: get_consumption_data "}}
+refreshed token
+Requested!
+Request: /api-private/api/get-consumption-data?cups=XXX&distributorCode=2&measurementType=0&pointType=5&startDate=2022%2F11&endDate=2023%2F03
+{"type": "RECORD", "record": {"stream": "get_consumption_data", "data": {"cups": "XXXX", "date": "2023/03/01", "time": "01:00", "consumptionKWh": 0.076, "obtainMethod": "Real"}, "emitted_at": 1680281671795}}
+[...]
+{"type": "RECORD", "record": {"stream": "get_consumption_data", "data": {"cups": "XXX", "date": "2023/03/30", "time": "22:00", "consumptionKWh": 0.0, "obtainMethod": "Real"}, "emitted_at": 1680281672153}}
+{"type": "RECORD", "record": {"stream": "get_consumption_data", "data": {"cups": "XXX", "date": "2023/03/30", "time": "23:00", "consumptionKWh": 0.0, "obtainMethod": "Real"}, "emitted_at": 1680281672154}}
+{"type": "RECORD", "record": {"stream": "get_consumption_data", "data": {"cups": "XXX", "date": "2023/03/30", "time": "24:00", "consumptionKWh": 0.0, "obtainMethod": "Real"}, "emitted_at": 1680281672154}}
+{"type": "LOG", "log": {"level": "INFO", "message": "Read 719 records from get_consumption_data stream"}}
+{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing get_consumption_data"}}
+{"type": "LOG", "log": {"level": "INFO", "message": "SourceDatadis runtimes:\nSyncing stream get_consumption_data 0:00:05.670986"}}
+{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceDatadis"}}
+
The records are correctly read and parsed, then, theoretically we should implement another connector for the write or rather use a postgres connector and save this stream to a table with a few clicks.
+We would probably parse the date-time to datetime and drop the obtainMethod or apply more transforms.
+Then any other type of source could just implement their own source connector and then plug to our cedata-api destination connector, which we would have to implement, but then would be decoupled from any source imaginable.
+gotchas: datadis api doesn't like repeated requests so the availability and retries have to be disabled.
+Here we didn't test it using the docker call. Also, to get this to the official airbyte the whole PR- accept flow needs to be followed. On self-hosted, the soruces api url is configurable and could point to a personal github fork of airbyte and work.
+ + + + + + + + + + + + + +Una mica de la festa de la timezone després de barallar-nos-hi un xic.
+mantras: Timestamp
is a picture of a clock. You don't want a picture of a clock.
Sobre timestamptz, contrariament al què hom pensaria quan llegeix "timestamp with time zone", aquest datatype de Postgres no guarda un timezone. És un "flag binari" de visualizació, un helper d'inserció i de visualització que converteix de la teva configuració o la de servidor a unix timestamp.
+Sobre què fem a dades, coincideix bastant amb el què diu aquí:
+ +Si posem [citation needed]
vol dir que encara ho estem analitzant.
show time zone
és Europe/Madrid
. Per tant les visualitzacions de timestamptz s'ensenyen en Europe/Madrid
Opcions:
+Triem la opció 1. tot datetime a la db en timestamptz i quan insertes has de fer-ho amb el timezone especificat. En general no confiem amb show time zone;
del servidor o client [citation needed perquè potser podríem acceptar agregacions naïf si hi confiéssim].
Quan estem fent servir timescaledb
fem agregacions amb la funció time_bucket
per que el query planner sàpiga com accedir els chunks correctament. En altre cas, fem servir agregacions amb date_trunc('day', some_timestamptz, 'Europe/Madrid')
Les dates poden ser naïf (no existeix el concepte de date aware a postgres), però seran en local. Si es pot i té sentit, mantenir el timestamptz de mitjanit [citation needed].
+si rebem un timestamp, de seguida el passem a timestamptz amb at time zone
.
si rebem un date, primer el passem a timestamp
+Amb el timezone del client configurat a 'Europe/Madrid', dóna
+select
+ '2021-01-01'::date as adate,
+ '2021-01-01'::date at time zone 'Europe/Madrid', -- timestamp 😲 ❌
+ '2021-01-01'::date::timestamp at time zone 'Europe/Madrid' -- timestamptz 👍
+
adate | +timezone | +timezone | +
---|---|---|
2021-01-01 | +2021-01-01 00:00:00 | +2021-01-01 00:00:00+01 | +
Ho passem a timestamp i convertim a la timezone que correspongui abans d'insertar-ho en timestamptz
+ CASE
+ WHEN estiu=1 AND (sistema = 'PEN' or sistema = 'BAL')
+ THEN data::timestamp AT TIME ZONE 'CEST'
+ WHEN estiu=0 AND (sistema = 'PEN' or sistema = 'BAL')
+ THEN data::timestamp AT TIME ZONE 'CET'
+ WHEN estiu=1 AND sistema = 'CAN'
+ THEN data::timestamp AT TIME ZONE 'WETDST'
+ WHEN estiu=0 AND sistema = 'CAN'
+ THEN data::timestamp AT TIME ZONE 'WET'
+ END AS end_hour_aware
+
Idealment afegiríem una columna amb el timezone en sintaxi postgres ('Europe/Madrid', 'Atlantic/Canary') per després poder fer
+select
+end_hour_aware,
+end_hour_aware at time zone timezone as end_hour_local,
+date_trunc('day', end_hour_aware, timezone) as day_local
+from (
+ values
+ ('2021-01-01 10:00:00'::timestamp at time zone 'Europe/Madrid', 'Europe/Madrid'),
+ ('2021-01-01 10:00:00'::timestamp at time zone 'Atlantic/Canary', 'Atlantic/Canary')
+) as foo(end_hour_aware, timezone);
+
end_hour_aware | +end_hour_local | +day_local | +
---|---|---|
2021-01-01 10:00:00+01 | +2021-01-01 10:00:00 | +2021-01-01 00:00:00+01 | +
2021-01-01 11:00:00+01 | +2021-01-01 10:00:00 | +2021-01-01 01:00:00+01 | +
Podeu veure la casuística:
+select
+end_hour_aware at time zone 'UTC' as end_hour_naif_utc_in_db, -- db *always* stores naïf unix timestamps, utc, even if datatype is timestamptz
+end_hour_aware as end_hour_aware_at_configured_timezone, -- depends on show time zone; of your client/server
+end_hour_aware at time zone timezone as end_hour_local, -- timestamp naïf (can't be otherwise once we localize)
+date_trunc('day', end_hour_aware, timezone) as midnight_local, -- midnight local seen by `show time zone;`, it's timestamptz, hence automatically converted for display. really unix_timestamp in db
+date_trunc('day', end_hour_aware, timezone)::date as day_local,
+date_trunc('day', end_hour_aware)::date as day_naif_local_and_wrong, -- ❌ implicit conversion to `show time zone`
+time_bucket('1 day', end_hour_aware, timezone) as day_bucket
+from (
+ values
+ ('2021-01-01 20:00:00'::timestamp at time zone 'Europe/Madrid', 'Europe/Madrid'),
+ ('2021-01-01 20:00:00'::timestamp at time zone 'Atlantic/Canary', 'Atlantic/Canary'),
+ ('2021-01-01 20:00:00'::timestamp at time zone 'PST', 'PST')
+) as foo(end_hour_aware, timezone);
+
end_hour_naif_utc_in_db | +end_hour_aware_at_configured_timezone | +end_hour_local | +midnight_local | +day_local | +day_naif_local_and_wrong | +day_bucket | +
---|---|---|---|---|---|---|
2021-01-01 19:00:00 | +2021-01-01 20:00:00+01 | +2021-01-01 20:00:00 | +2021-01-01 00:00:00+01 | +2021-01-01 | +2021-01-01 | +2021-01-01 00:00:00+01 | +
2021-01-01 20:00:00 | +2021-01-01 21:00:00+01 | +2021-01-01 20:00:00 | +2021-01-01 01:00:00+01 | +2021-01-01 | +2021-01-01 | +2021-01-01 01:00:00+01 | +
2021-01-02 04:00:00 | +2021-01-02 05:00:00+01 | +2021-01-01 20:00:00 | +2021-01-01 09:00:00+01 | +2021-01-01 | +2021-01-02 | +2021-01-01 09:00:00+01 | +
Les agregacions (de calendari) sempre amb el time zone que sigui rellevant. Una agregació diaria està sempre lligada a un timezone concret, perquè el dia està definit només dins d'un timezone, sinó parlaríem d'agregacions de 24h, que faríem en utc.
+select
+ -- date_trunc per defecte fa servir el time zone configurat
+ date_trunc('day', '2021-01-01 01:00:00+05:00'::timestamptz) as date_trunc_local,
+ date_trunc('day', '2021-01-01 01:00:00+05:00'::timestamptz, 'Europe/Madrid') as date_trunc_local_explicit,
+ -- time_bucket per defecte fa servir utc
+ time_bucket('1 day', '2021-01-01 01:00:00+05:00'::timestamptz) as time_bucket_utc,
+ time_bucket('1 day', '2021-01-01 01:00:00+05:00'::timestamptz, 'Europe/Madrid') as time_bucket_local;
+
date_trunc_local | +date_trunc_local_explicit | +time_bucket_utc | +time_bucket_local | +
---|---|---|---|
2020-12-31 00:00:00+01 | +2020-12-31 00:00:00+01 | +2020-12-31 01:00:00+01 | +2020-12-31 00:00:00+01 | +
En general ho faríem tot en el time zone de l'Estat, però depèn del use case (Veure Excepcions a la norma).
+Si hem de convertir a date caldrà passar-ho al time zone que toqui abans de convertir a date, com que no tenim una gunció time_bucket(timestamptz)->date (ni date_trunc) +cal que abans de convertir a date ho passem a naïf del timezone que toqui per a què postgres no li apliqui el time zone que tinguem configurat.
+select '2022-12-31 23:00:00+00'::timestamptz,
+'2022-12-31 23:00:00+00'::timestamptz at time zone 'Europe/Madrid',
+'2022-12-31 23:00:00'::timestamp at time zone 'Europe/Madrid' as naif,
+date_trunc('day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid'),
+date_trunc('day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid')::date as incorrect_dt_based_on_config,
+(date_trunc('day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid') at time zone 'Europe/Madrid')::date as correct_dt,
+(time_bucket('1 day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid') at time zone 'Europe/Madrid'),
+(time_bucket('1 day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid'))::date as incorrect_tb_based_on_config,
+(time_bucket('1 day', '2022-12-31 23:00:00+00'::timestamptz, 'Europe/Madrid') at time zone 'Europe/Madrid')::date as correct
+
timestamptz | +timezone | +naif | +date_trunc | +incorrect_dt_based_on_config | +correct_dt | +timezone | +incorrect_tb_based_on_config | +correct | +
---|---|---|---|---|---|---|---|---|
2022-12-31 23:00:00+00 | +2023-01-01 00:00:00 | +2022-12-31 22:00:00+00 | +2022-12-31 23:00:00+00 | +2022-12-31 | +2023-01-01 | +2023-01-01 00:00:00 | +2022-12-31 | +2023-01-01 | +
Pel cas que estem tractant actualment, previsió de la demanda, fem servir el dia local (a picture of a clock) perquè el què ens interessa no és comparar intervals de temps, sinó comportaments del dilluns, del cap de setmana, etc. Les hores a agrupar, els dies a agrupar, són culturals, encara que de fet representin packets d'hores universals diferents [citation needed].
+En general aquest recull Don't do this està força bé.
+timestamptz
no està suportat a tot arreu, hi ha ORMs que no ho suporten bé.
Tothom ha de ser conscient que ha d'inserir explicitant el timezone o bé assegurant-se que el timezone configurat del servidor és el què ell assumeix que és. Com que això últim és un pitfall, millor sempre passar a timestamptz de seguida i explicitament.
+Això implica que tothom sàpiga que els timestamps són naïfs semànticament. TODO: Elaborar maneres alternatives de fer inserts a la proposada al Don't do this
+Si no tindràs mai de la vida altres timezones... però no ho recomanem, perquè després passa el què passa
+ + + + + + + + + + + + + +Actualment tenim processos de transformació duplicats, sense control de versions ni cap manteniment escampats entre queries del Superset i Redash.
+timescale
+countinuous agregats de timescale té moltes limitacions
+dbt
+DBT va agafant avantatge.
+Necessitem establir un estàndar de gestió de dades a SomEnergia
+schema_data ware
+Technical Story:
+Un dels motius de dbt és reduir code duplication per les agregacions (diaria, setmanal, mensual, trimestral, anual...), com ho podem fer a dbt?
+As-is necessitem un model per cada agregació per cada taula final, que no ens evita la duplicació de codi tot perquè cada agregació necessita el seu model
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+Technical Story:
+En Juanpe m'ha comentat Apache Kafka. Ara mateix estem enfocats a batch-processing data pipelines. +Volem suportar també event-driven data pipelines? +Si si, com i perquè?
+[Describe the context and problem statement, e.g., in free form using two to three sentences. You may want to articulate the problem in form of a question.]
+Chosen option: "[option 1]", because [justification. e.g., only option, which meets k.o. criterion decision driver | which resolves force force | … | comes out best (see below)].
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+Technical Story:
+OLAP cubes?
+2. datasets basats en dbt models
+A revisar en el futur
+[example | description | pointer to more information | …]
+Molta gent a dbt argumenta en contra de OLAP cubes i a favor de datasets
+Technical Story:
+Per fer els indicadors derivats ens convé tenir dades amples on cada columna és l'indicador i la primera fila create date
+Hem triat 1. dbt pivot perquè resol el tema en pur sql i views.
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+[ADR-0005](0005-example.md)]
Technical Story: [description | ticket/issue URL]
+[Describe the context and problem statement, e.g., in free form using two to three sentences. You may want to articulate the problem in form of a question.]
+Chosen option: "[option 1]", because [justification. e.g., only option, which meets k.o. criterion decision driver | which resolves force force | … | comes out best (see below)].
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+[example | description | pointer to more information | …]
+More details on variations of the main workflow to come
+ + + + + + + + + + + + + +tl; dr aka muxo testo
+git merge YOUR_BRANCH
+
+python scripts/csv_to_sqltable.py --csvpath "datasources/erppeek/erppeek_kpis_decription.csv" --dbapi "postgresql://somenergia:PASSWORD@puppis.somenergia.lan:5432/dades" --schema prod_operational --table erppeek_kpis_description --ifexists append --truncate
+
+git push
+
+#Run Airflow DAG
+
+dbt run --target prod -m +kpis_row+
+
Airflow is in Continuous Delivery, main
branch will be automatically downloaded in production as soon as a task is run.
git pull
+git merge main
+git checkout main
+git merge YOUR_BRANCH
+
When you're ready, push to production
+git push
+
From your local machine launch script to update KPIs table from CSV.
+This process overwrites the table.
+Given that dbt views depend on the table, we can't drop it. Therefore we truncate and append.
+python scripts/csv_to_sqltable.py --csvpath "datasources/erppeek/erppeek_kpis_description.csv" --dbapi "postgresql://somenergia:PASSWORD@puppis.somenergia.lan:5432/dades" --schema prod_operational --table erppeek_kpis_description --ifexists append --truncate
+
Airflow runs daily tasks reading the kpis table and querying the ERP via erppeek. +You can wait for it to run or run it manually. +DBT already selects the newest run when publishing the kpis to the datamart.
+From your local machine run DBT workflow targeting production environment.
+dbt run --target prod -m +kpis_wide+
+
or
+dbt run --target prod
+