Skip to content

Commit

Permalink
LIU-390: Update based on linter feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Jun 20, 2024
1 parent 9563f59 commit 227ef27
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 110 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ install: ## Install the project in dev mode.
.PHONY: fmt
fmt: ## Format code using black & isort.
$(ENV_PREFIX)isort daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 tests/
$(ENV_PREFIX)black -l 90 daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 tests/

.PHONY: lint
lint: ## Run pep8, black, mypy linters.
$(ENV_PREFIX)flake8 daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 --check daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 --check tests/
$(ENV_PREFIX)flake8 --ignore=E501 daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 --check daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 --check tests/
$(ENV_PREFIX)mypy --ignore-missing-imports daliuge_plasma_components/

.PHONY: test
Expand Down
4 changes: 1 addition & 3 deletions daliuge_plasma_components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
__package__ = "daliuge_plasma_components"
# The following imports are the binding to the DALiuGE system
from dlg import droputils, utils

# extend the following as required
from .data import PlasmaDROP, PlasmaFlightDROP

__all__ = ["PlasmaDROP", "PlasmaFlightDROP"]
__all__ = ["PlasmaDROP", "PlasmaFlightDROP"]
4 changes: 0 additions & 4 deletions daliuge_plasma_components/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# __main__ is not required for DALiuGE components.
import argparse # pragma: no cover

from . import MyAppDROP # pragma: no cover


def main() -> None: # pragma: no cover
"""
Expand Down Expand Up @@ -52,8 +50,6 @@ def main() -> None: # pragma: no cover
print("Verbose mode is on.")

print("Executing main function")
comp = MyAppDROP()
print(comp.run())
print("End of main function")


Expand Down
8 changes: 3 additions & 5 deletions daliuge_plasma_components/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
Originally in daliuge/daliuge-engine/apps/plasmaflight.py
"""

import hashlib
from io import BytesIO
import logging
from io import BytesIO
from typing import Optional

import pyarrow
import pyarrow.flight as paf
import pyarrow.plasma as plasma

Expand Down Expand Up @@ -100,7 +98,7 @@ def seal(self, object_id: plasma.ObjectID):
self.plasma_client.seal(object_id)

def put_raw_buffer(self, data: memoryview, object_id: plasma.ObjectID):
"""Puts """
"""Puts"""
self.plasma_client.put_raw_buffer(data, object_id)

def get_buffer(
Expand Down Expand Up @@ -139,7 +137,7 @@ def exists(self, object_id: plasma.ObjectID, owner: Optional[str] = None) -> boo
f"{self._scheme}://{owner}", **self._connection_args
)
try:
info = client.get_flight_info(
client.get_flight_info(
paf.FlightDescriptor.for_path(
object_id.binary().hex().encode("utf-8")
)
Expand Down
137 changes: 48 additions & 89 deletions daliuge_plasma_components/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@
Originally in daliuge/daliuge-engine/data/drops/plasma.py
"""

import logging
import pyarrow
import binascii
import io
import logging
from typing import Optional, Union
from overrides import overrides

import binascii
import os
from typing import Optional

import numpy as np
import pyarrow
from dlg.data.drops.data_base import DataDROP
from dlg.data.io import DataIO
from dlg.meta import dlg_bool_param, dlg_string_param
from overrides import overrides
from pyarrow import plasma as plasma

from dlg.data.drops.data_base import DataDROP
from dlg.meta import dlg_string_param, dlg_bool_param
from daliuge_plasma_components.apps import PlasmaFlightClient
from dlg.data.io import DataIO

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,7 +72,7 @@ def __init__(
self._plasma_path = plasma_path
self._object_id = object_id
self._reader = None
self._writer = None
self._writer = io.BytesIO()
# treat sizes <1 as None
self._expected_size = (
expected_size if expected_size and expected_size > 0 else None
Expand Down Expand Up @@ -112,35 +111,28 @@ def _write(self, data, **kwargs) -> int:
If use_staging is True, any number of writes may occur with a small performance penalty.
"""
# NOTE: data must be a collection of bytes for len to represent the buffer bytesize
assert isinstance(
data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__
)
# assert isinstance(
# data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__bytes__()
# )
databytes = data.nbytes if isinstance(data, memoryview) else len(data)

if self._use_staging:
if not self._writer:
# write into a resizable staging buffer
self._writer = io.BytesIO()
else:
if not self._writer:
# write directly into fixed size plasma buffer
self._buffer_size = (
self._expected_size
if self._expected_size is not None
else databytes
)
plasma_buffer = self._desc.create(self._object_id, self._buffer_size)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
if self._writer.tell() + databytes > self._buffer_size:
raise IOError(
"".join(
[
f"attempted to write {self._writer.tell() + databytes} ",
f"bytes to plasma buffer of size {self._buffer_size}, ",
"consider using staging or expected_size argument",
]
)
if not self._use_staging:
# write directly into fixed size plasma buffer
self._buffer_size = (
self._expected_size if self._expected_size is not None else databytes
)
plasma_buffer = self._desc.create(self._object_id, self._buffer_size)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
if self._writer.tell() + databytes > self._buffer_size:
raise IOError(
"".join(
[
f"attempted to write {self._writer.tell() + databytes} ",
f"bytes to plasma buffer of size {self._buffer_size}, ",
"consider using staging or expected_size argument",
]
)
)

self._writer.write(data)
return len(data)
Expand Down Expand Up @@ -183,7 +175,7 @@ def __init__(
self._plasma_path = plasma_path
self._flight_path = flight_path
self._reader = None
self._writer = None
self._writer = io.BytesIO()
# treat sizes <1 as None
self._expected_size = (
expected_size if expected_size and expected_size > 0 else None
Expand Down Expand Up @@ -217,26 +209,22 @@ def _read(self, count, **kwargs):
def _write(self, data, **kwargs) -> int:

# NOTE: data must be a collection of bytes for len to represent the buffer bytesize
assert isinstance(
data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__
)
# assert isinstance(
# data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__
# )
databytes = data.nbytes if isinstance(data, memoryview) else len(data)
if not self._writer:
if self._use_staging:
# stream into resizeable buffer
logger.warning(
"Using dynamically sized Plasma buffer. Performance may be reduced."
)
self._writer = io.BytesIO()
else:
# write directly to fixed size plasma buffer
self._buffer_size = (
self._expected_size
if self._expected_size is not None
else databytes
)
plasma_buffer = self._desc.create(self._object_id, self._buffer_size)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
if self._use_staging:
# stream into resizeable buffer
logger.warning(
"Using dynamically sized Plasma buffer. Performance may be reduced."
)
else:
# write directly to fixed size plasma buffer
self._buffer_size = (
self._expected_size if self._expected_size is not None else databytes
)
plasma_buffer = self._desc.create(self._object_id, self._buffer_size)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
self._writer.write(data)
return len(data)

Expand All @@ -255,27 +243,6 @@ def delete(self):
@overrides
def buffer(self) -> memoryview:
return self._desc.get_buffer(self._object_id, self._flight_path)
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#


##
Expand Down Expand Up @@ -306,9 +273,7 @@ def initialize(self, **kwargs):
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20)
if len(self.uid) != 20
else self.uid.encode("ascii")
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")
Expand All @@ -323,9 +288,7 @@ def getIO(self):

@property
def dataURL(self) -> str:
return "plasma://%s" % (
binascii.hexlify(self.object_id).decode("ascii")
)
return "plasma://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
Expand Down Expand Up @@ -358,9 +321,7 @@ def initialize(self, **kwargs):
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20)
if len(self.uid) != 20
else self.uid.encode("ascii")
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")
Expand All @@ -376,6 +337,4 @@ def getIO(self):

@property
def dataURL(self) -> str:
return "plasmaflight://%s" % (
binascii.hexlify(self.object_id).decode("ascii")
)
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))
28 changes: 24 additions & 4 deletions tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import pytest
import unittest
import logging
import os

given = pytest.mark.parametrize

from dlg.apps.app_base import BarrierAppDROP
Expand Down Expand Up @@ -58,6 +60,7 @@ def run(self):
outputDrop = self.outputs[0]
outputDrop.write(str(crcSum).encode("utf8"))


class TestDROP(unittest.TestCase):
"""
===============================================================
Expand All @@ -68,6 +71,7 @@ class TestDROP(unittest.TestCase):
to pick up the work to transition from plasma to an alternative.
===============================================================
"""

def setUp(self):
"""
library-specific setup
Expand Down Expand Up @@ -131,46 +135,62 @@ def test_write_plasmaDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_write_withDropType(PlasmaDROP)
except FileNotFoundError:
logging.info(f"plasma_store not found when running test.")
finally:
store.terminate()
if store:
store.terminate()

def test_dynamic_write_plasmaDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_dynamic_write_withDropType(PlasmaDROP)
except FileNotFoundError:
logging.info(f"plasma_store not found when running test.")
finally:
store.terminate()
if store:
store.terminate()

def test_write_plasmaFlightDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_write_withDropType(PlasmaFlightDROP)
except FileNotFoundError:
logging.info(f"plasma_store not found when running test.")
finally:
store.terminate()
if store:
store.terminate()

def test_dynamic_write_plasmaFlightDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_dynamic_write_withDropType(PlasmaFlightDROP)
except FileNotFoundError:
logging.info(f"plasma_store not found when running test.")
finally:
store.terminate()
if store:
store.terminate()

0 comments on commit 227ef27

Please sign in to comment.