"""
Listener to an event stream.
"""
import time
import logging
import copy
from typing import Union, Callable
from obspy import UTCDateTime, Catalog
from obspy.core.event import Event
from rt_eqcorrscan.event_trigger.listener import (
_Listener, event_time, EventInfo)
from rt_eqcorrscan.database.database_manager import (
TemplateBank, remove_unreferenced)
Logger = logging.getLogger(__name__)
[docs]def filter_events(
events: Union[list, Catalog],
min_stations: int,
auto_picks: bool,
auto_event: bool,
event_type: Union[list, str],
**kwargs,
) -> list:
"""
Filter events from a catalog based on some quality attributes.
Parameters
----------
events:
Events to apply filter to.
min_stations:
Minimum number of stations for event to be kept.
auto_picks:
Whether to keep automatic picks or not.
auto_event:
Whether to keep automatic events or not.
event_type:
Event types to keep.
Returns
-------
Events that pass the criteria
"""
if isinstance(events, Catalog):
events_out = copy.deepcopy(events.events)
else:
events_out = copy.deepcopy(events)
_events_out = []
for i, ev in enumerate(events_out):
ev, keep = _qc_event(
ev, min_stations=min_stations, auto_picks=auto_picks,
auto_event=auto_event, event_type=event_type
)
if keep:
_events_out.append(ev)
return _events_out
def _qc_event(
event: Event,
min_stations: int = None,
auto_picks: bool = True,
auto_event: bool = True,
event_type: Union[list, str] = None,
) -> tuple:
"""
QC an individual event - removes picks in place.
Returns
-------
tuple of (event: Event, keep: bool)
"""
if event_type is not None and isinstance(event_type, str):
event_type = [event_type]
if event_type is not None and event.event_type not in event_type:
return event, False
elif not auto_event:
if "manual" not in [ori.evaluation_mode for ori in event.origins]:
return event, False
if not auto_picks:
pick_ids_to_remove = [
p.resource_id for p in event.picks
if p.evaluation_mode == "automatic"
]
# remove arrivals and amplitudes and station_magnitudes
event.picks = [
p for p in event.picks if p.resource_id not in pick_ids_to_remove
]
event = remove_unreferenced(event)[0]
stations = {p.waveform_id.station_code for p in event.picks}
if len(stations) < min_stations:
return event, False
return event, True
# TODO: back-fill method - cope with updates to events.
[docs]class CatalogListener(_Listener):
"""
Client query class for obspy clients with a `get_events` service.
Parameters
----------
client:
Client to query - must have at least a `get_events` method.
catalog:
Catalog of past events - can be empty. Any new events will be compared
to this catalog and only added to the template bank if they are not
in the original catalog.
catalog_lookup_kwargs:
Dictionary of keyword arguments for `client.get_events`.
template_bank:
A Template database - new events will be added to this database.
interval:
Interval for querying the client in seconds. Note that rapid queries
may not be more efficient, and will almost certainly piss off your
provider.
keep:
Time in seconds to keep events for in the catalog in memory. Will not
remove old events on disk. Use to reduce memory consumption.
waveform_client
Client with at least a `get_waveforms` and `get_waveforms_bulk` method.
If this is None (default) then the `client` will be used.
"""
busy = False
_test_start_step = 0 # Number of seconds prior to `now` used for testing.
_speed_up = 1 # Multiplier for query intervals, used for synthesising
# previous sequences, not general purpose.
def __init__(
self,
client,
template_bank: TemplateBank,
catalog: Catalog = None,
catalog_lookup_kwargs: dict = None,
interval: float = 10,
keep: float = 86400,
waveform_client=None,
):
self.client = client
self.waveform_client = waveform_client or client
if catalog is None:
catalog = Catalog()
self.set_old_events(
[EventInfo(ev.resource_id.id, event_time(ev)) for ev in catalog])
self.template_bank = template_bank
self.catalog_lookup_kwargs = catalog_lookup_kwargs or dict()
self.interval = interval
self.keep = keep
self.threads = []
self.triggered_events = Catalog()
self.busy = False
self.previous_time = UTCDateTime.now()
def __repr__(self):
"""
..rubric:: Example
>>> from obspy.clients.fdsn import Client
>>> listener = CatalogListener(
... client=Client("GEONET"), catalog=Catalog(),
... catalog_lookup_kwargs=dict(
... latitude=-45, longitude=175, maxradius=2),
... template_bank=TemplateBank('.'))
>>> print(listener) # doctest: +NORMALIZE_WHITESPACE
CatalogListener(client=Client(http://service.geonet.org.nz),\
catalog=Catalog(0 events), interval=10, **kwargs)
"""
print_str = (
"CatalogListener(client=Client({0}), catalog=Catalog({1} events), "
"interval={2}, **kwargs)".format(
self.client.base_url, len(self.old_events), self.interval))
return print_str
@property
def sleep_interval(self):
return self.interval / self._speed_up
[docs] def run(
self,
make_templates: bool = True,
template_kwargs: dict = None,
min_stations: int = 0,
auto_event: bool = True,
auto_picks: bool = True,
event_type: Union[list, str] = None,
filter_func: Callable = None,
starttime: UTCDateTime = None,
**filter_kwargs,
) -> None:
"""
Run the listener. New events will be added to the template_bank.
Parameters
----------
make_templates:
Whether to add new templates to the database (True) or not.
template_kwargs:
Dictionary of keyword arguments for making templates, requires
at-least: lowcut, highcut, samp_rate, filt_order, prepick, length,
swin.
min_stations:
Minimum number of stations for an event to be added to the
TemplateBank
auto_event:
If True, both automatic and manually reviewed events will be
included. If False, only manually reviewed events will be included
auto_picks:
If True, both automatic and manual picks will be included. If False
only manually reviewed picks will be included. Note that this is
done **before** counting the number of stations.
event_type
List of event types to keep.
filter_func
Function used for filtering. If left as none, this will use the
`catalog_listener.filter_events` function.
starttime
When to start to get events from, defaults to the last time
the listener was run.
filter_kwargs:
If the `filter_func` has changed then this should be the
additional kwargs for the user-defined filter_func.
"""
self.busy = True
if starttime is None:
self.previous_time -= self._test_start_step
else:
self.previous_time = starttime
template_kwargs = template_kwargs or dict()
if 'save_raw' not in template_kwargs.keys():
template_kwargs['save_raw'] = False
# Unless someone really wants this, we don't need the raw for
# anything else.
loop_duration = 0 # Timer for loop, used in synthesising speed-ups
while self.busy:
tic = time.time() # Timer for loop, used in synthesising speed-ups
if self._test_start_step > 0:
# Still processing past data
self._test_start_step -= loop_duration * self._speed_up
self._test_start_step += loop_duration
# Account for UTCDateTime.now() already including loop_
# duration once.
elif self._test_start_step < 0:
# We have gone into the future!
raise NotImplementedError(
"Trying to access future data: spoilers not allowed")
now = UTCDateTime.now() - self._test_start_step
# Remove old events from cache
self._remove_old_events(now)
Logger.debug("Checking for new events between {0} and {1}".format(
self.previous_time, now))
try:
# Check for new events - add in a pad of five times the
# checking interval to ensure that slow-to-update events are
# included.
new_events = self.client.get_events(
starttime=self.previous_time - (5 * self.interval),
endtime=now, **self.catalog_lookup_kwargs)
except Exception as e:
if "No data available for request" in e.args[0]:
Logger.debug("No new data")
else: # pragma: no cover
Logger.error(
"Could not download data between {0} and {1}".format(
self.previous_time, now))
Logger.error(e)
time.sleep(self.sleep_interval)
toc = time.time() # Timer for loop, used in synthesising speed-ups
loop_duration = toc - tic
continue
if new_events is not None:
if filter_func is not None:
new_events = filter_func(
new_events, min_stations=min_stations,
auto_picks=auto_picks, auto_event=auto_event,
event_type=event_type, **filter_kwargs)
old_event_ids = [tup[0] for tup in self.old_events]
new_events = Catalog(
[ev for ev in new_events if ev.resource_id
not in old_event_ids])
Logger.info("{0} new events between {1} and {2}".format(
len(new_events), self.previous_time, now))
if len(new_events) > 0:
Logger.info("Adding {0} new events to the database".format(
len(new_events)))
for event in new_events:
try:
origin = (
event.preferred_origin() or event.origins[0])
except IndexError:
continue
try:
magnitude = (
event.preferred_magnitude() or
event.magnitudes[0])
except IndexError:
continue
Logger.info(
"Event {0}: M {1:.1f}, lat: {2:.2f}, "
"long: {3:.2f}, depth: {4:.2f}km".format(
event.resource_id.id, magnitude.mag,
origin.latitude, origin.longitude,
origin.depth / 1000.))
event_info = [
EventInfo(ev.resource_id.id, event_time(ev))
for ev in new_events]
if make_templates:
self.template_bank.make_templates(
new_events, client=self.waveform_client,
**template_kwargs)
else:
self.template_bank.put_events(new_events)
# Putting the events in the bank clears the catalog.
Logger.info(f"Putting events into old_events: \n{event_info}")
# Try looping... extend seems unstable?
for ev_info in event_info:
self.append(ev_info)
# self.extend(event_info)
Logger.debug("Old events current state: {0}".format(
self.old_events))
self.previous_time = now
# Sleep in steps to make death responsive
_sleep_step = min(10.0, self.sleep_interval)
_slept = 0.0
while _slept < self.sleep_interval:
_tic = time.time()
time.sleep(_sleep_step) # Need to sleep to allow other threads to run
if not self.busy:
break
_toc = time.time()
_slept += _toc - _tic
toc = time.time() # Timer for loop, used in synthesising speed-ups
loop_duration = toc - tic
return
if __name__ == "__main__":
import doctest
doctest.testmod()