Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Fink livestreams #24

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
'tomtoolkit~=2.12.0',
'elasticsearch-dsl>=7.3,<7.5',
'markdown',
'fink-client'
],
extras_require={
'test': [
Expand Down
213 changes: 207 additions & 6 deletions tom_fink/fink.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import numpy as np
from astropy.time import Time

from fink_client.consumer import AlertConsumer

FINK_URL = "https://fink-portal.org"
COLUMNS = 'i:candid,d:rf_snia_vs_nonia,i:ra,i:dec,i:jd,i:magpsf,i:objectId,d:cdsxmatch'


class FinkQueryForm(GenericQueryForm):
""" Class to organise the Query Form for Fink.
""" Class to organise the Query Form for Fink API.

It currently contains forms for
* ObjectId search
It currently contains all Fink REST API features, except bayestar.

"""
help_objectid = """
Expand Down Expand Up @@ -178,15 +179,100 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class FinkDESCQueryForm(GenericQueryForm):
""" Class to organise the Query Form for Fink livestreams.
"""
help_topic = """
Enter a valid Fink topic to query:
"""
topic = forms.CharField(
required=False,
label='topic',
widget=forms.TextInput(
attrs={'placeholder': 'enter a Fink topic'}
),
help_text=md.markdown(
help_topic
),
)

help_username = """
Enter your username
"""
username = forms.CharField(
required=False,
label='username',
help_text=md.markdown(
help_username
),
widget=forms.TextInput(
attrs={
'username'
}
)
)

help_groupid = """
Enter your group_id
"""
groupid = forms.CharField(
required=False,
label='groupid',
help_text=md.markdown(
help_groupid
),
widget=forms.TextInput(
attrs={
'groupid'
}
)
)

help_servers = """
Enter the server names
"""
servers = forms.CharField(
required=False,
label='servers',
help_text=md.markdown(
help_servers
),
widget=forms.TextInput(
attrs={
'servers'
}
)
)

help_timeout = """
Enter the timeout (seconds)
"""
timeout = forms.CharField(
required=False,
label='timeout',
help_text=md.markdown(
help_timeout
),
widget=forms.TextInput(
attrs={
'timeout'
}
)
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class FinkBroker(GenericBroker):
"""
The ``FinkBroker`` is the interface to the Fink alert broker.
The ``FinkBroker`` is the interface to the Fink REST API

For information regarding Fink and its available
filters for querying, please see http://134.158.75.151:24000/api
filters for querying, please see https://fink-portal.org/api
"""

name = 'Fink'
name = 'FinkAPI'
form = FinkQueryForm

def fetch_alerts(self, parameters: dict) -> iter:
Expand Down Expand Up @@ -381,3 +467,118 @@ def to_generic_alert(self, alert):
mag=alert['i:magpsf'],
score=alert['d:rf_snia_vs_nonia']
)


class FinkDESC(GenericBroker):
"""
The ``FinkDESC`` is the interface to the Fink livestream

github.com/astrolabsoftware/fink-client/blob/master/docs/livestream_manual.md
"""

name = 'FinkDESC'
form = FinkDESCQueryForm

def consume(self, consumer, timeout):
""" Wrapper around the fink-client consumer
"""
topic, alert, key = consumer.poll(timeout)
return topic, alert, key

def fetch_alerts(self, parameters: dict) -> iter:
""" Call the Fink Livestream based on parameters from the Query Form.

Parameters
----------
parameters: dict
Dictionary that contains query parameters defined in the Form
Example: {
'topic': 'toto',
'servers': '127.0.0.1,'
'username': 'toto',
'groupid': 'toto-fink',
'timeout': 5.0,
'broker': 'FinkDesc',
}

Returns
----------
out: iter
Iterable on alert data (list of dictionary). Alert data is in
the form {column name: value}.
"""
myconfig = {
'username': parameters['username'],
'bootstrap.servers': parameters['servers'],
'group_id': parameters['groupid']
}
topics = [parameters['topic']]

# Define the consumer
consumer = AlertConsumer(topics, myconfig)

# Consume
topic, alert, key = self.consume(consumer, parameters['timeout'])

if topic is not None:
return iter([alert])
return iter([])

def fetch_alert(self, id: str):
""" Call the Fink livestream based on parameters from the Query Form.

Parameters
----------
parameters: str
"""
pass

def process_reduced_data(self, target, alert=None):
pass

def to_target(self, alert: dict) -> Target:
""" Redirect query result to a Target

Parameters
----------
alert: dict
GenericAlert instance

"""
target = Target.objects.create(
name=alert.name,
type='SIDEREAL',
ra=alert.ra,
dec=alert.dec,
)
return target

def to_generic_alert(self, alert):
""" Extract relevant parameters from the Fink livestream to the TOM


Notes
----------
1. ``GenericAlert`` would have to be expanded
2. `alert` is a nested dictionary. See fink-client (consumer).

Parameters
----------
alert: dict
Dictionary containing alert data: {column name: value}. See
`self.fetch_alerts` for more information.

Returns
----------
out: GenericAlert
Alert columns to be displayed on the TOM interface
"""
return GenericAlert(
timestamp=alert['candidate']['jd'],
id=alert['candid'],
name=alert['objectId'],
ra=alert['candidate']['ra'],
dec=alert['candidate']['dec'],
mag=alert['candidate']['magpsf'],
score=alert['rfscore']
)