-
Notifications
You must be signed in to change notification settings - Fork 8
Using Stream2segment in your Python code
This notebook assumes you have already run the download routine (s2s download -c <config_file.yaml>
on the terminal) and you desire to run custom processing on the downloaded segments
- Introduction
- Writing a custom processing routine with
imap
- Iterating over selected segments
- Accessing to the db (low level approach)
Stream2segment has two main functions for working with downloaded data, process
and imap
:
from stream2segment.process import process, imap
# you can type help(process) or help(imap) for documentation
Both functions run custom code implemented in a so-called processing function iteratively on a selection of downloaded waveform segments:
-
imap
yields the output of the processing function for each selected segment -
process
writes the output of the processing function for each selected segment in a row of a chosen tabular file (HDF or CSV)
In this notebook, we will show how to work on downloaded segments via imap
. An example of process
is found in the Python module paramtable.py
available in the same directory of this Notebook, if the latter has been created with the command s2s init
Note: The main goal of paramtable.py
is to exemplify the case where a processing routine is run with a more "low level" approach from the terminal, without Jupyter: the module can be opened and modified easily following the documentation therein, and eventually run as script from the terminal:
python paramtable.py
In general, and especially for big data processing, this approach is the recommended way to process segments, as it avoids the unnecessary overhead of a Jupyter Notebook, which is designed for other purposes. In addition, imap
and process
have several options that are not designed to be used within a Notebook (e.g., run with parallel sub-processes for faster execution, show progress bar with the % of task done and the estimated reamining time)
Regardless of your environment or design choice, a processing routine always involves the same operations:
- Connect to a database
- Select which segments to fetch
- Run a processing function on the selected segments
DISCLAIMER: Pay attention when typing database URLs with passwords: do not commit or distribute them, try to avoid to type them on the terminal (or otherwise delete the command history)
In most cases, the database where the data has been downloaded and ready needs a simple string URL. For simplicity, a database URL is usually extracted from the download configuration file:
import yaml
# uncomment the line below using an existing file on your OS:
# with open('replace_with_your_download_config_path.yaml', 'r') as fpt:
# dburl = yaml.safe_load(fpt)['dburl']
In this Notebook we will use the default example database (2 segments) available in the same directory of this file after executing the command s2s init
. If you will have problem later accessing the database, check the current working directory and be sure the database is in it. A database URL can be written according to the RFC-1738 syntax:
import os
dbfile = os.path.join(os.getcwd(), 'example.db.sqlite')
assert os.path.isfile(dbfile), "Db file not found, please execute this notebook from within the db the directory"
dburl = 'sqlite:///' + dbfile
# print(dburl)
The selection of suitable segments is performed by creating a dict
mapping one or more Segment attributes to a selection expression for that attribute (for details on the segment object and its attributes, see the segment object)
# create the selection dict. This dict select a single segment (id=2) for illustrative purposes:
segments_selection = {
'has_valid_data': 'true',
'maxgap_numsamples': '[-0.5, 0.5]',
'event_distance_deg': '[70, 80]'
# other optional attributes (see cheatsheet below for details):
# missing_data_sec: '<120'
# missing_data_ratio: '<0.5'
# id: '<300'
# event.time: "(2014-01-01T00:00:00, 2014-12-31T23:59:59)"
# event.latitude: "[24, 70]"
# event.longitude: "[-11, 24]"
}
A processing function has signature (arguments) (segment, config)
where the first argument is a segment object and the second is a custom dictionary of argument that can be passed to imap
and process
(in this example, we will not provide any config, thus the second argument will not be used).
The function has no limitation on what can be implemented, you should only avoid to return the whole segment object as it might be detached (see next section for details), i.e. its attributes not accessible outside the processing function. However, you can safely return any of the segment attributes separately
def my_processing_function(segment, config):
"""simple processing function. Take the segment stream and remove its instrumental response"""
# Get ObsPy Trace object. If the waveform has no gapos/overlaps, the trace is the only element
# of the segment stream object (otherwise the stream will have several traces):
trace = segment.stream()[0]
# remove the instrumental response of the Trace:
# get ObsPy Inventory object:
inventory = segment.inventory()
# remove the response:
trace_remresp = trace.remove_response(inventory) # see caveat below
# return the segment.id, the event magnitude, the original trace and the trace with response removed
return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp
Any exception raised in the processing function (or any function called in it) will interrupt the whole processing routine with one special case: raising stream2segment.process.SkipSegment
will cause the execution to resume from the next segment.
SkipSegment
is raised by default when the segment waveform data or inventory are malformed (i.e., when segment.stream()
and segment.inventory()
fail), but can be used also to skip a segment programmatically, e.g.:
from stream2segment.process import SkipSegment
def my_processing_function_raising(segment, config):
print('what')
if segment.sample_rate < 30:
raise SkipSegment("segment sample rate too low")
# ... implement your code here
If a log file is given to imap
or process
(not shown here), then all segment skipped will be logged to file with their id and error message for later inspection
We can now illustrate a usage of, e.g., imap
. In this case, the output of our processing function is simply printed
from stream2segment.process import imap
for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
print()
print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
print('Segment trace (first three points):')
print(' - Counts units (no response removed): %s' % trace.data[:3])
print(' - Physical units (response removed): %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
- Counts units (no response removed): [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]
- Physical units (response removed): [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]
Caveat: As you can see, the returned trace expected in count units is actually the same as the trace with response removed. This happened because many Stream
and Trace
methods (check ObsPy documentation in case of doubts) modify the objects in-place, i.e. they permanently modify the returned value of segment.stream()
(which is cached in the segment object for performance reasons).
A solution is to call segment.stream(reload=True)
that forces the complete reload of the stream from the database, or simply work on copies of those objects (which is generally faster), e.g.:
def my_processing_function(segment, config):
"""simple processing function. Take the segment stream and remove its instrumental response"""
# Get ObsPy Trace object and make a COPY of IT:
trace = segment.stream()[0].copy()
# now, segment.stream()[0] and trace are two different objects.
# By just running the same code as in the previous processing function
# we will remove the response to `trace` and preserve `segment.stream()`
# (but you can do also the other way around, depending on your needs)
inventory = segment.inventory()
trace_remresp = trace.remove_response(inventory) # see caveat below
return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp
# And now we get the expected result:
for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
print()
print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
print('Segment trace (first three points):')
print(' - Counts units (no response removed): %s' % trace.data[:3])
print(' - Physical units (response removed): %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
- Counts units (no response removed): [-314 -280 -251]
- Physical units (response removed): [-1.42699395e-06 -1.43603990e-06 -1.42210201e-06]
Although builtin functions imap
and process
should fit most of the user needs, if you need even more customization, a more low-level approach consists of simply work iteratively on the selected segments via get_segments
. This is basically what the two builtin functions do under the hood with a given segments selection:
from stream2segment.process import get_segments
for seg in get_segments(dburl, segments_selection):
# do your work here... In this case, with the first segment `seg` just loaded, simply exit the loop:
break
get_segments
(and consequently, imap
and process
using it) opens a database session, yields selected segments and closes the session afterwards. A database session is an object that establishes all conversations with the database and represents a "holding zone" for all the objects which you’ve loaded or associated with it during its lifespan.
Closing a session is recommended after you finished your work as it releases memory on the computer and (if the db is remote) on the server, avoiding potential problems. get_segments
, imap
and process
do this automatically. Note that after a session is closed, all segment objects are detached from the database, which means we can not access anymore all of its properties, but only those previously loaded. E.g., accessing the segment related objects (e.g. the event object) outside the for loop, raises an error:
try:
seg.event
except Exception as exc:
print('ERROR: ' + str(exc))
ERROR: Parent instance <Segment at 0x1303d7460> is not bound to a Session; lazy load operation of attribute 'event' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)
In very specific cases where you want to keep the segments and all related objects accessible (i.e. attached to a session) also outside a get_segments
for-loop, you can then call get_segments
with a session object instead of a db url.
Using the session
object is in general for users experienced with the DB library employed by stream2segment.
For these cases, we will show hereafter some properties of the Segment
object in more details
from stream2segment.process import get_session, Segment
session = get_session(dburl)
# query a random segment with a dummy filter just for illustrative purposes:
seg = session.query(Segment).filter(Segment.id < 10).first()
With the session still open, we can access some of the segments related objects (this will issue a query to the database):
evt = seg.event
print(str(evt))
sta = seg.station
print(str(sta))
Event
attributes (16 of 16 loaded):
event_id: 20170908_0 (str, 16 characters, showing first 10 only)
time: 2017-09-08 04:49:21.200000 (datetime)
latitude: 15.02 (float)
longitude: -93.81 (float)
depth_km: 72.0 (float)
author: EMSC (str)
catalog: EMSC-RTS (str)
contributor: EMSC (str)
contributor_id: 616600 (str)
mag_type: mw (str)
magnitude: 8.1 (float)
mag_author: EMSC (str)
event_location_name: OFFSHORE C (str, 24 characters, showing first 10 only)
event_type: None (NoneType)
webservice_id: 1 (int)
id: 1 (int)
related_objects (0 of 2 loaded):
webservice
segments
Station
attributes (11 of 11 loaded):
network: GE (str)
station: RUE (str)
latitude: 52.4759 (float)
longitude: 13.78 (float)
elevation: 40.0 (float)
site_name: None (NoneType)
start_time: 2012-03-21 10:00:00 (datetime)
end_time: None (NoneType)
inventory_xml: b'\x1f\x8b\x08\x00\xa4\x99\x1b\\\x02\xff' (bytes, 44710 elements, showing first 10 only)
datacenter_id: 1 (int)
id: 2 (int)
related_objects (0 of 3 loaded):
datacenter
channels
segments
Note that both objects have, among their related objects, a so-called back-reference segments
. This is a list-like object of Segment
s (among which the original seg
object) and not a single entity because of a so-called "many-to-one" relationship: one segment is always related to one event/station, whereas one event generates many segments at different station, and one station generates many segments for different events.
This extremely powerful feature connecting several entities effortless is a natural consequence of being backed by a relational database and it would be nearly impossible to get with a classical file system storage. Neverthless, it should be used with care with massive downloads, as one might load in the session
huge amount of data, causing memory overflows. A solution might be to call from times to times session.expunge_all()
which remove all object instances from the session (possibly freeing memory) or load only each object id, deferring the load of all other attributes from the database upon access, e.g.:
from sqlalchemy.orm import load_only
evt = seg.event
# load event related segments (*risk of memory overflow: low):
segments = evt.segments.options(load_only(Segment.id)).all()
cha = seg.channel
# load channel related segments (*risk of memory overflow: medium):
segments = cha.segments.options(load_only(Segment.id)).all()
sta = seg.station
# load station related segments (*risk of memory overflow: high):
segments = sta.segments.options(load_only(Segment.id)).all()
dct = seg.datacenter
# load data center (e.g. IRIS) related segments (*risk of memory overflow: very high):
segments = dct.segments.options(load_only(Segment.id)).all()
* The levels of risk reported are just heuristically estimated and have to be considered reliable only relative to each other (an event has almost certainly less related segments than a channel, which has almost certainly less related segments than a station, and so on)
In any case, for really memory consuming or long tasks, as already recommended, consider moving the Notebook code into a custom Python module and execute the module as a script or via use the command s2s process
# finally close session
from stream2segment.process import close_session
close_session(session)
# If close_session returns True, the session and the database connection are now closed.
# If you want to close the session only (freeing memory but keeping the session reusable later):
close_session(session, dispose_engine=False)
True
This program is released under the GNU GENERAL PUBLIC LICENSE Version 3
-
Research article: Riccardo Zaccarelli, Dino Bindi, Angelo Strollo, Javier Quinteros and Fabrice Cotton. Stream2segment: An Open‐Source Tool for Downloading, Processing, and Visualizing Massive Event‐Based Seismic Waveform Datasets. Seismological Research Letters (2019) https://doi.org/10.1785/0220180314
-
Software: Zaccarelli, Riccardo (2018): Stream2segment: a tool to download, process and visualize event-based seismic waveform data. V. 2.7.3. GFZ Data Services. http://doi.org/10.5880/GFZ.2.4.2019.002