Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Nov 22, 2021
1 parent 7d985bf commit b590ce5
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ docs
literature
logs
nbs
.git
.git
20 changes: 10 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ repos:
- id: pydocstyle
args:
[
--convention=google,
"--add-ignore=D200,D202,D210,D212,D415",
"satip",
--convention=google,
"--add-ignore=D200,D202,D210,D212,D415",
"satip",
]
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
args:
[
--max-line-length,
"100",
--extend-ignore=E203,
--per-file-ignores,
"__init__.py:F401",
"satip",
--max-line-length,
"100",
--extend-ignore=E203,
--per-file-ignores,
"__init__.py:F401",
"satip",
]
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
Expand All @@ -52,4 +52,4 @@ repos:
rev: v2.4.1
hooks:
- id: prettier
types: [yaml]
types: [yaml]
5 changes: 3 additions & 2 deletions satip/compression.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import xarray as xr
import numpy as np
from typing import Union

import numpy as np
import xarray as xr


class Compressor:
def __init__(
Expand Down
23 changes: 12 additions & 11 deletions satip/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
#
############

import fsspec
import requests.exceptions
import yaml
from satip import eumetsat
import pandas as pd
import os
import math
import logging
import subprocess
import math
import multiprocessing
from itertools import repeat
import os
import subprocess
import time
from datetime import datetime, timedelta
from itertools import repeat
from typing import Callable, List, Optional, Tuple

import fsspec
import numpy as np
from typing import Optional, List, Tuple, Callable
import pandas as pd
import requests.exceptions
import yaml

from datetime import datetime, timedelta
from satip import eumetsat

_LOG = logging.getLogger("satip.download")
_LOG.setLevel(logging.INFO)
Expand Down
13 changes: 6 additions & 7 deletions satip/eumetsat.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import pandas as pd

from typing import Union, List
import datetime
import zipfile
import copy
import datetime
import os
from io import BytesIO
import re
import urllib
import zipfile
from io import BytesIO
from typing import List, Union

from requests.auth import HTTPBasicAuth
import pandas as pd
import requests
from requests.auth import HTTPBasicAuth

from satip import utils

Expand Down
8 changes: 4 additions & 4 deletions satip/geospatial.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
from numbers import Number
from typing import Tuple, List
from typing import List, Tuple

import numpy as np
import pyproj
Expand Down Expand Up @@ -30,7 +30,7 @@ class Transformers:
"""

def __init__(self):
""" Init """
"""Init"""
self._lat_lon_to_osgb = None
self.make_transformers()

Expand All @@ -44,7 +44,7 @@ def make_transformers(self):

@property
def lat_lon_to_osgb(self):
""" lat-lon to OSGB property """
"""lat-lon to OSGB property"""
return self._lat_lon_to_osgb


Expand All @@ -53,7 +53,7 @@ def lat_lon_to_osgb(self):


def download_grids():
""" The transformer grid sometimes need updating """
"""The transformer grid sometimes need updating"""
pyproj.transformer.TransformerGroup(crs_from=WGS84, crs_to=OSGB).download_grids(verbose=True)

transformers.make_transformers()
Expand Down
68 changes: 36 additions & 32 deletions satip/intermediate.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
from satip.utils import (
load_native_to_dataset,
save_dataset_to_zarr,
check_if_timestep_exists,
)
from satip.eumetsat import eumetsat_filename_to_datetime, eumetsat_cloud_name_to_datetime
import os
import pandas as pd
from pathlib import Path
import multiprocessing
import os
from itertools import repeat
from pathlib import Path

import pandas as pd
import xarray as xr
from tqdm import tqdm

from satip.eumetsat import eumetsat_cloud_name_to_datetime, eumetsat_filename_to_datetime
from satip.utils import check_if_timestep_exists, load_native_to_dataset, save_dataset_to_zarr


def split_per_month(directory: str,
zarr_path: str,
hrv_zarr_path: str,
region: str,
temp_directory: str = "/mnt/ramdisk/",
spatial_chunk_size: int = 256,
temporal_chunk_size: int = 1, ):
def split_per_month(
directory: str,
zarr_path: str,
hrv_zarr_path: str,
region: str,
temp_directory: str = "/mnt/ramdisk/",
spatial_chunk_size: int = 256,
temporal_chunk_size: int = 1,
):
"""
Splits the Zarr creation into multiple, month-long Zarr files for parallel writing
Expand Down Expand Up @@ -47,26 +47,28 @@ def split_per_month(directory: str,
for month in month_directories:
if not os.path.isdir(os.path.join(directory, year, month)):
continue
month_directory = os.path.join(directory, year.split('/')[0], month.split('/')[0])
month_directory = os.path.join(directory, year.split("/")[0], month.split("/")[0])
month_zarr_path = zarr_path + f"_{year.split('/')[0]}_{month.split('/')[0]}.zarr"
hrv_month_zarr_path = hrv_zarr_path + f"_{year.split('/')[0]}" \
f"_{month.split('/')[0]}.zarr"
hrv_month_zarr_path = (
hrv_zarr_path + f"_{year.split('/')[0]}" f"_{month.split('/')[0]}.zarr"
)
dirs.append(month_directory)
zarrs.append(month_zarr_path)
hrv_zarrs.append(hrv_month_zarr_path)
zarr_exists = os.path.exists(month_zarr_path)
if not zarr_exists:
# Inital zarr path before then appending
compressed_native_files = list(Path(month_directory).rglob("*.bz2"))
dataset, hrv_dataset = load_native_to_dataset(compressed_native_files[0],
temp_directory,
region)
dataset, hrv_dataset = load_native_to_dataset(
compressed_native_files[0], temp_directory, region
)
save_dataset_to_zarr(dataset, zarr_path=month_zarr_path, zarr_mode="w")
save_dataset_to_zarr(hrv_dataset, zarr_path=hrv_month_zarr_path, zarr_mode="w")
print(dirs)
print(zarrs)
pool = multiprocessing.Pool(processes=16)
for _ in tqdm(pool.imap_unordered(
for _ in tqdm(
pool.imap_unordered(
wrapper,
zip(
dirs,
Expand All @@ -75,17 +77,18 @@ def split_per_month(directory: str,
repeat(temp_directory),
repeat(region),
repeat(spatial_chunk_size),
repeat(temporal_chunk_size)
),
)):
repeat(temporal_chunk_size),
),
)
):
print("Month done")


def wrapper(args):
dirs, zarrs, hrv_zarrs, temp_directory, region, spatial_chunk_size, temporal_chunk_size = args
create_or_update_zarr_with_native_files(dirs, zarrs, hrv_zarrs, temp_directory, region,
spatial_chunk_size,
temporal_chunk_size)
create_or_update_zarr_with_native_files(
dirs, zarrs, hrv_zarrs, temp_directory, region, spatial_chunk_size, temporal_chunk_size
)


def create_or_update_zarr_with_native_files(
Expand Down Expand Up @@ -136,15 +139,15 @@ def create_or_update_zarr_with_native_files(
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=11
channel_chunk_size=11,
)
save_dataset_to_zarr(
hrv_dataset,
zarr_path=hrv_zarr_path,
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=1
channel_chunk_size=1,
)
except Exception as e:
print(f"Failed with: {e}")
Expand All @@ -155,9 +158,10 @@ def create_or_update_zarr_with_native_files(


def pool_init(q):
global processed_queue # make queue global in workers
global processed_queue # make queue global in workers
processed_queue = q


def native_wrapper(filename_and_area):
filename, area = filename_and_area
processed_queue.put(load_native_to_dataset(filename, area))
Loading

0 comments on commit b590ce5

Please sign in to comment.