Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIU-390: Move plasma drop implementation to this repository. #1

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,3 @@ jobs:
# password: ${{ secrets.TEST_PYPI }}
# repository_url: https://test.pypi.org/legacy/

tests_mac:
needs: linter
strategy:
fail-fast: false
matrix:
python-version: [3.8]
os: [macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install project
run: make install
- name: Run tests
run: make test
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ install: ## Install the project in dev mode.
.PHONY: fmt
fmt: ## Format code using black & isort.
$(ENV_PREFIX)isort daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 tests/
$(ENV_PREFIX)black -l 90 daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 tests/

.PHONY: lint
lint: ## Run pep8, black, mypy linters.
$(ENV_PREFIX)flake8 daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 --check daliuge_plasma_components/
$(ENV_PREFIX)black -l 79 --check tests/
$(ENV_PREFIX)flake8 --ignore=E501 daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 --check daliuge_plasma_components/
$(ENV_PREFIX)black -l 90 --check tests/
$(ENV_PREFIX)mypy --ignore-missing-imports daliuge_plasma_components/

.PHONY: test
Expand Down
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)


Awesome daliuge_plasma_components created by ICRAR
`daliuge_plasma_components` created by ICRAR

## Installation

Expand All @@ -17,11 +17,7 @@ There are multiple options for the installation, depending on how you are intend
```bash
pip install daliuge_plasma_components
```
This will only work after releasing the project to PyPi.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (documentation): Clarify if the project has been released to PyPi.

The line about PyPi release has been removed. Has the project been released to PyPi, making this line obsolete?

### Engine in Docker container
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (documentation): Explain the removal of Docker installation instructions.

The Docker installation instructions have been removed. Are there alternative instructions provided elsewhere, or is Docker no longer supported?

```bash
docker exec -t daliuge-engine bash -c 'pip install --prefix=$DLG_ROOT/code daliuge_plasma_components'
```

## Usage
For example the MyComponent component will be available to the engine when you specify
```
Expand Down
7 changes: 2 additions & 5 deletions daliuge_plasma_components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
__package__ = "daliuge_plasma_components"
# The following imports are the binding to the DALiuGE system
from dlg import droputils, utils

# extend the following as required
from .apps import MyAppDROP
from .data import MyDataDROP
from .data import PlasmaDROP, PlasmaFlightDROP

__all__ = ["MyAppDROP", "MyDataDROP"]
__all__ = ["PlasmaDROP", "PlasmaFlightDROP"]
4 changes: 0 additions & 4 deletions daliuge_plasma_components/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# __main__ is not required for DALiuGE components.
import argparse # pragma: no cover

from . import MyAppDROP # pragma: no cover


def main() -> None: # pragma: no cover
"""
Expand Down Expand Up @@ -52,8 +50,6 @@ def main() -> None: # pragma: no cover
print("Verbose mode is on.")

print("Executing main function")
comp = MyAppDROP()
print(comp.run())
print("End of main function")


Expand Down
196 changes: 134 additions & 62 deletions daliuge_plasma_components/apps.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,148 @@
"""
daliuge_plasma_components appComponent module.

This is the module of daliuge_plasma_components containing DALiuGE application components.
Here you put your main application classes and objects.
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#

Typically a component project will contain multiple components and will
then result in a single EAGLE palette.
"""
Plasma Flight Client implementation

Be creative! do whatever you need to do!
Originally in daliuge/daliuge-engine/apps/plasmaflight.py
"""

import logging
import pickle

from dlg.drop import BarrierAppDROP, BranchAppDrop
from dlg.meta import (
dlg_batch_input,
dlg_batch_output,
dlg_bool_param,
dlg_component,
dlg_float_param,
dlg_int_param,
dlg_streaming_input,
dlg_string_param,
)
from io import BytesIO
from typing import Optional

import pyarrow.flight as paf
import pyarrow.plasma as plasma

logger = logging.getLogger(__name__)

##
# @brief MyApp
# @details Template app for demonstration only!
# Replace the documentation with whatever you want/need to show in the DALiuGE
# workflow editor. The appclass parameter should contain the relative Pythonpath
# to import MyApp.
#
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/daliuge_plasma_components.MyApp/String/readonly/
# \~English Import direction for application class
# @param[in] param/dummy Dummy parameter/ /String/readwrite/
# \~English Dummy modifyable parameter
# @param[in] port/dummy Dummy in/float/
# \~English Dummy input port
# @param[out] port/dummy Dummy out/float/
# \~English Dummy output port
# @par EAGLE_END

# Application components can inherit from BarrierAppDROP or BranchAppDrop.
# It is also possible to inherit directly from the AbstractDROP class. Please
# refer to the Developer Guide for more information.


class MyAppDROP(BarrierAppDROP):
"""A template BarrierAppDrop that doesn't do anything at all
Add your functionality in the run method and optional additional
methods.

class PlasmaFlightClient:
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider breaking down the new functionality into smaller, more manageable pieces.

The new code introduces significant complexity compared to the original. Here are some points to consider:

  1. Increased Code Size: The new code is significantly longer, making it more complex to read and understand.
  2. Additional Dependencies: The new code introduces several new dependencies (pyarrow, pyarrow.flight, pyarrow.plasma, hashlib, BytesIO, and Optional), which increases the complexity of the codebase.
  3. New Functionality: The new code adds a lot of new functionality related to Plasma and Flight clients, which adds to the cognitive load required to understand the code.
  4. Error Handling: The new code includes more error handling, which adds to the complexity.
  5. Logging: The new code includes more logging statements, which, while useful for debugging, add to the overall complexity.
  6. Documentation: The new code has more extensive documentation, which adds to the length and complexity of the file.

To maintain simplicity while adding new functionality, consider breaking down the new functionality into smaller, more manageable pieces. For example, you could introduce a separate PlasmaClient class to handle Plasma-related operations, keeping it separate from the MyAppDROP class. This approach keeps the code more modular and easier to manage.

Client for accessing plasma-backed arrow flight data server.
"""

compontent_meta = dlg_component(
"MyApp",
"My Application",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)
def __init__(
self,
socket: str,
scheme: str = "grpc+tcp",
connection_args: Optional[dict] = None,
):
"""
Args:
socket (str): The socket of the local plasma store
scheme (str, optional): [description]. Defaults to "grpc+tcp".
connection_args (dict, optional): [description]. Defaults to {}.
"""
self.plasma_client: plasma.PlasmaClient = plasma.connect(socket)
self._scheme = scheme
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Handle potential connection errors.

Consider adding error handling for the connection to the plasma store. This will help in diagnosing issues if the connection fails.

self._connection_args = {} if connection_args is None else connection_args

def list_flights(self, location: str):
"""
Retrieves a list of flights
"""
flight_client = paf.FlightClient(
f"{self._scheme}://{location}", **self._connection_args
)
return flight_client.list_flights()

def get_flight(
self, object_id: plasma.ObjectID, location: str
) -> paf.FlightStreamReader:
"""
Retreives an flight object stream
"""
descriptor = paf.FlightDescriptor.for_path(
object_id.binary().hex().encode("utf-8")
)

logger.debug(
f"connecting to {self._scheme}://{location} with descriptor {descriptor}"
)
flight_client = paf.FlightClient(
f"{self._scheme}://{location}", **self._connection_args
)
info = flight_client.get_flight_info(descriptor)
for endpoint in info.endpoints:
logger.debug("using endpoint locations %s", endpoint.locations)
return flight_client.do_get(endpoint.ticket)

def create(self, object_id: plasma.ObjectID, size: int) -> plasma.PlasmaBuffer:
"""
Creates an empty plasma buffer
"""
return self.plasma_client.create(object_id, size)

sleepTime = dlg_float_param("sleep time", 0)
def seal(self, object_id: plasma.ObjectID):
"""Seals the plasma buffer marking it as readonly"""
self.plasma_client.seal(object_id)

def initialize(self, **kwargs):
super(MyAppDROP, self).initialize(**kwargs)
def put_raw_buffer(self, data: memoryview, object_id: plasma.ObjectID):
"""Puts"""
self.plasma_client.put_raw_buffer(data, object_id)

def get_buffer(
self, object_id: plasma.ObjectID, owner: Optional[str] = None
) -> memoryview:
"""
Gets the plasma object from the local store if it's available,
otherwise queries the plasmaflight owner for the object.
"""
logger.debug("PlasmaFlightClient Get %s", object_id)
if self.plasma_client.contains(object_id):
# first check if the local store contains the object
[buf] = self.plasma_client.get_buffers([object_id])
return memoryview(buf)
elif owner is not None:
# fetch from the specified owner
reader = self.get_flight(object_id, owner)
table = reader.read_all()
output = BytesIO(table.column(0)[0].as_py()).getbuffer()
# cache buffer in local plasma
self.put_raw_buffer(output, object_id)
return output
else:
raise KeyError("ObjectID not found", object_id)

def run(self):
def exists(self, object_id: plasma.ObjectID, owner: Optional[str] = None) -> bool:
"""
The run method is mandatory for DALiuGE application components.
Returns true if the remote plasmaflight server contains the plasma object.
"""
return f"Hello from {self.__class__.__name__}"
# check cache
if self.plasma_client.contains(object_id):
return True
# check remote
if owner is not None:
client = paf.FlightClient(
f"{self._scheme}://{owner}", **self._connection_args
)
try:
client.get_flight_info(
paf.FlightDescriptor.for_path(
object_id.binary().hex().encode("utf-8")
)
)
return True
except paf.FlightError:
return False
return False
Loading
Loading