-
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIU-390: Move plasma drop implementation to this repository. #1
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) | ||
|
||
|
||
Awesome daliuge_plasma_components created by ICRAR | ||
`daliuge_plasma_components` created by ICRAR | ||
|
||
## Installation | ||
|
||
|
@@ -17,11 +17,7 @@ There are multiple options for the installation, depending on how you are intend | |
```bash | ||
pip install daliuge_plasma_components | ||
``` | ||
This will only work after releasing the project to PyPi. | ||
### Engine in Docker container | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question (documentation): Explain the removal of Docker installation instructions. The Docker installation instructions have been removed. Are there alternative instructions provided elsewhere, or is Docker no longer supported? |
||
```bash | ||
docker exec -t daliuge-engine bash -c 'pip install --prefix=$DLG_ROOT/code daliuge_plasma_components' | ||
``` | ||
|
||
## Usage | ||
For example the MyComponent component will be available to the engine when you specify | ||
``` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,6 @@ | ||
__package__ = "daliuge_plasma_components" | ||
# The following imports are the binding to the DALiuGE system | ||
from dlg import droputils, utils | ||
|
||
# extend the following as required | ||
from .apps import MyAppDROP | ||
from .data import MyDataDROP | ||
from .data import PlasmaDROP, PlasmaFlightDROP | ||
|
||
__all__ = ["MyAppDROP", "MyDataDROP"] | ||
__all__ = ["PlasmaDROP", "PlasmaFlightDROP"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,76 +1,148 @@ | ||
""" | ||
daliuge_plasma_components appComponent module. | ||
|
||
This is the module of daliuge_plasma_components containing DALiuGE application components. | ||
Here you put your main application classes and objects. | ||
# | ||
# ICRAR - International Centre for Radio Astronomy Research | ||
# (c) UWA - The University of Western Australia, 2015 | ||
# Copyright by UWA (in the framework of the ICRAR) | ||
# All rights reserved | ||
# | ||
# This library is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU Lesser General Public | ||
# License as published by the Free Software Foundation; either | ||
# version 2.1 of the License, or (at your option) any later version. | ||
# | ||
# This library is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
# Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public | ||
# License along with this library; if not, write to the Free Software | ||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, | ||
# MA 02111-1307 USA | ||
# | ||
|
||
Typically a component project will contain multiple components and will | ||
then result in a single EAGLE palette. | ||
""" | ||
Plasma Flight Client implementation | ||
|
||
Be creative! do whatever you need to do! | ||
Originally in daliuge/daliuge-engine/apps/plasmaflight.py | ||
""" | ||
|
||
import logging | ||
import pickle | ||
|
||
from dlg.drop import BarrierAppDROP, BranchAppDrop | ||
from dlg.meta import ( | ||
dlg_batch_input, | ||
dlg_batch_output, | ||
dlg_bool_param, | ||
dlg_component, | ||
dlg_float_param, | ||
dlg_int_param, | ||
dlg_streaming_input, | ||
dlg_string_param, | ||
) | ||
from io import BytesIO | ||
from typing import Optional | ||
|
||
import pyarrow.flight as paf | ||
import pyarrow.plasma as plasma | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
## | ||
# @brief MyApp | ||
# @details Template app for demonstration only! | ||
# Replace the documentation with whatever you want/need to show in the DALiuGE | ||
# workflow editor. The appclass parameter should contain the relative Pythonpath | ||
# to import MyApp. | ||
# | ||
# @par EAGLE_START | ||
# @param category PythonApp | ||
# @param[in] param/appclass Application Class/daliuge_plasma_components.MyApp/String/readonly/ | ||
# \~English Import direction for application class | ||
# @param[in] param/dummy Dummy parameter/ /String/readwrite/ | ||
# \~English Dummy modifyable parameter | ||
# @param[in] port/dummy Dummy in/float/ | ||
# \~English Dummy input port | ||
# @param[out] port/dummy Dummy out/float/ | ||
# \~English Dummy output port | ||
# @par EAGLE_END | ||
|
||
# Application components can inherit from BarrierAppDROP or BranchAppDrop. | ||
# It is also possible to inherit directly from the AbstractDROP class. Please | ||
# refer to the Developer Guide for more information. | ||
|
||
|
||
class MyAppDROP(BarrierAppDROP): | ||
"""A template BarrierAppDrop that doesn't do anything at all | ||
Add your functionality in the run method and optional additional | ||
methods. | ||
|
||
class PlasmaFlightClient: | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider breaking down the new functionality into smaller, more manageable pieces. The new code introduces significant complexity compared to the original. Here are some points to consider:
To maintain simplicity while adding new functionality, consider breaking down the new functionality into smaller, more manageable pieces. For example, you could introduce a separate |
||
Client for accessing plasma-backed arrow flight data server. | ||
""" | ||
|
||
compontent_meta = dlg_component( | ||
"MyApp", | ||
"My Application", | ||
[dlg_batch_input("binary/*", [])], | ||
[dlg_batch_output("binary/*", [])], | ||
[dlg_streaming_input("binary/*")], | ||
) | ||
def __init__( | ||
self, | ||
socket: str, | ||
scheme: str = "grpc+tcp", | ||
connection_args: Optional[dict] = None, | ||
): | ||
""" | ||
Args: | ||
socket (str): The socket of the local plasma store | ||
scheme (str, optional): [description]. Defaults to "grpc+tcp". | ||
connection_args (dict, optional): [description]. Defaults to {}. | ||
""" | ||
self.plasma_client: plasma.PlasmaClient = plasma.connect(socket) | ||
self._scheme = scheme | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Handle potential connection errors. Consider adding error handling for the connection to the plasma store. This will help in diagnosing issues if the connection fails. |
||
self._connection_args = {} if connection_args is None else connection_args | ||
|
||
def list_flights(self, location: str): | ||
""" | ||
Retrieves a list of flights | ||
""" | ||
flight_client = paf.FlightClient( | ||
f"{self._scheme}://{location}", **self._connection_args | ||
) | ||
return flight_client.list_flights() | ||
|
||
def get_flight( | ||
self, object_id: plasma.ObjectID, location: str | ||
) -> paf.FlightStreamReader: | ||
""" | ||
Retreives an flight object stream | ||
""" | ||
descriptor = paf.FlightDescriptor.for_path( | ||
object_id.binary().hex().encode("utf-8") | ||
) | ||
|
||
logger.debug( | ||
f"connecting to {self._scheme}://{location} with descriptor {descriptor}" | ||
) | ||
flight_client = paf.FlightClient( | ||
f"{self._scheme}://{location}", **self._connection_args | ||
) | ||
info = flight_client.get_flight_info(descriptor) | ||
for endpoint in info.endpoints: | ||
logger.debug("using endpoint locations %s", endpoint.locations) | ||
return flight_client.do_get(endpoint.ticket) | ||
|
||
def create(self, object_id: plasma.ObjectID, size: int) -> plasma.PlasmaBuffer: | ||
""" | ||
Creates an empty plasma buffer | ||
""" | ||
return self.plasma_client.create(object_id, size) | ||
|
||
sleepTime = dlg_float_param("sleep time", 0) | ||
def seal(self, object_id: plasma.ObjectID): | ||
"""Seals the plasma buffer marking it as readonly""" | ||
self.plasma_client.seal(object_id) | ||
|
||
def initialize(self, **kwargs): | ||
super(MyAppDROP, self).initialize(**kwargs) | ||
def put_raw_buffer(self, data: memoryview, object_id: plasma.ObjectID): | ||
"""Puts""" | ||
self.plasma_client.put_raw_buffer(data, object_id) | ||
|
||
def get_buffer( | ||
self, object_id: plasma.ObjectID, owner: Optional[str] = None | ||
) -> memoryview: | ||
""" | ||
Gets the plasma object from the local store if it's available, | ||
otherwise queries the plasmaflight owner for the object. | ||
""" | ||
logger.debug("PlasmaFlightClient Get %s", object_id) | ||
if self.plasma_client.contains(object_id): | ||
# first check if the local store contains the object | ||
[buf] = self.plasma_client.get_buffers([object_id]) | ||
return memoryview(buf) | ||
elif owner is not None: | ||
# fetch from the specified owner | ||
reader = self.get_flight(object_id, owner) | ||
table = reader.read_all() | ||
output = BytesIO(table.column(0)[0].as_py()).getbuffer() | ||
# cache buffer in local plasma | ||
self.put_raw_buffer(output, object_id) | ||
return output | ||
else: | ||
raise KeyError("ObjectID not found", object_id) | ||
|
||
def run(self): | ||
def exists(self, object_id: plasma.ObjectID, owner: Optional[str] = None) -> bool: | ||
""" | ||
The run method is mandatory for DALiuGE application components. | ||
Returns true if the remote plasmaflight server contains the plasma object. | ||
""" | ||
return f"Hello from {self.__class__.__name__}" | ||
# check cache | ||
if self.plasma_client.contains(object_id): | ||
return True | ||
# check remote | ||
if owner is not None: | ||
client = paf.FlightClient( | ||
f"{self._scheme}://{owner}", **self._connection_args | ||
) | ||
try: | ||
client.get_flight_info( | ||
paf.FlightDescriptor.for_path( | ||
object_id.binary().hex().encode("utf-8") | ||
) | ||
) | ||
return True | ||
except paf.FlightError: | ||
return False | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (documentation): Clarify if the project has been released to PyPi.
The line about PyPi release has been removed. Has the project been released to PyPi, making this line obsolete?