"""
Overarching tool for listening to and triggering from FDSN earthquakes.
"""
import logging
import time
import os
import signal
import subprocess
from copy import deepcopy
from typing import Callable, Union, List
from multiprocessing import cpu_count
from obspy import UTCDateTime, Catalog
from obspy.core.event import Event, Magnitude
from obsplus.events import get_events
from eqcorrscan.core.match_filter import read_tribe
from rt_eqcorrscan.database.database_manager import (
TemplateBank, check_tribe_quality)
from rt_eqcorrscan.event_trigger.listener import _Listener
from rt_eqcorrscan.config import Config
from rt_eqcorrscan.reactor.scaling_relations import get_scaling_relation
Logger = logging.getLogger(__name__)
def _get_triggered_working_dir(
triggering_event_id: str,
exist_ok: bool = True
) -> str:
working_dir = os.path.join(
os.path.abspath(os.getcwd()), triggering_event_id)
os.makedirs(working_dir, exist_ok=exist_ok)
return working_dir
[docs]class Reactor(object):
"""
A class to listen to a client and start-up a real-time instance.
The real-time instance will be triggered by the listener when set
conditions are met. Appropriate templates will be extracted from the
template database on disk and used as a Tribe for the real-time detection.
Once triggered, the listener will keep listening, but will not trigger in
the same region again while the real-time detector is running. The real-time
detector has to be stopped manually, or when rate or maximum-duration
conditions are met.
Parameters
----------
client
An obspy or obsplus client that supports event and station queries.
listener
Listener for checking current earthquake activity
trigger_func:
A function that returns a list of events that exceed some trigger
parameters given a catalog. See note below.
template_database
A template database to be used to generate tribes for real-time
matched-filter detection.
config
Configuration for RT-EQcorrscan.
Notes
-----
`trigger_func` must only take one argument, a catalog of events. To
achieve this you should generate a partial function from your trigger
function. For example, using the provided rate-and-magnitude triggering
function:
>>> from rt_eqcorrscan.event_trigger import magnitude_rate_trigger_func
>>> from functools import partial
>>> trigger_func = partial(
... magnitude_rate_trigger_func, magnitude_threshold=4,
... rate_threshold=20, rate_bin=0.5)
"""
_triggered_events = []
_running_templates = dict()
# Template events ids keyed by triggering-event-id
_running_regions = dict() # Regions keyed by triggering-event-id
max_station_distance = 1000
n_stations = 10
sleep_step = 15
# Fudge factors for past sequence simulation
_speed_up = 1
_test_start_step = 0.0
# Maximum processors dedicated to one detection routine.
_max_detect_cores = 12
# The processes that are detecting away!
detecting_processes = dict()
# TODO: Build in an AWS spin-up functionality - would need some communication...
def __init__(
self,
client,
listener: _Listener,
trigger_func: Callable,
template_database: TemplateBank,
config: Config,
listener_starttime: UTCDateTime = None,
):
self.client = client
self.listener = listener
self.trigger_func = trigger_func
self.template_database = template_database
self.config = config
self._listener_kwargs = dict(
min_stations=config.database_manager.min_stations,
template_kwargs=config.template,
starttime=listener_starttime)
self.notifier = config.notifier.notifier
# Time-keepers
self._run_start = None
self._running = False
self.up_time = 0
signal.signal(signal.SIGINT, self._handle_interupt)
signal.signal(signal.SIGTERM, self._handle_interupt)
@property
def available_cores(self) -> int:
return max(1, cpu_count() - 1)
@property
def running_template_ids(self) -> set:
_ids = set()
for value in self._running_templates.values():
_ids.update(value)
return _ids
def get_up_time(self):
return self._up_time
def set_up_time(self, now):
if self._run_start is not None:
self._up_time = now - self._run_start
else:
self._up_time = 0
up_time = property(get_up_time, set_up_time)
[docs] def run(self, max_run_length: float = None) -> None:
"""
Run all the processes.
Parameters
----------
max_run_length
Maximum run length in seconds, if None, will run indefinitely.
"""
# Get the original keep value - we will over-write this temporarily
listener_keep = deepcopy(self.listener.keep)
if self._listener_kwargs.get("starttime"):
# Set to keep at least until the starttime
_keep_len = (UTCDateTime.now() -
self._listener_kwargs["starttime"]) + 3600
self.listener.keep = max(_keep_len, listener_keep)
Logger.info(f"Setting listener keep to {self.listener.keep}")
# Try a get and put to make sure that threads have the same memory space...
old_events = self.listener.old_events
self.listener.old_events = old_events
# Run the listener!
self.listener.background_run(**self._listener_kwargs)
self._run_start = UTCDateTime.now()
# Query the catalog in the listener every so often and check
self._running = True
first_iteration = True
while self._running:
old_events = deepcopy(self.listener.old_events)
Logger.info(f"Old events from the listener has {len(old_events)} events")
# Get these locally to avoid accessing shared memory multiple times
if len(old_events) > 0:
working_ids = [_[0] for _ in old_events]
working_cat = self.template_database.get_events(
eventid=working_ids)
if len(working_ids) and not len(working_cat):
Logger.warning("Error getting events from database, getting individually")
working_cat = Catalog()
for working_id in working_ids:
try:
working_cat += self.template_database.get_events(
eventid=working_id)
except Exception as e:
Logger.error(f"Could not read {working_id} due to {e}")
continue
else:
working_cat = []
Logger.info("Currently analysing a catalog of {0} events".format(
len(working_cat)))
self.process_new_events(new_events=working_cat)
Logger.debug("Finished processing new events")
self.set_up_time(UTCDateTime.now())
Logger.debug(f"Up-time: {self.up_time}")
if max_run_length is not None and self.up_time >= max_run_length:
Logger.critical("Max run length reached. Stopping.")
self.stop()
break
Logger.debug(f"Sleeping for {self.sleep_step} seconds")
time.sleep(self.sleep_step)
Logger.debug("Waking up")
if first_iteration and len(working_cat):
# Revert keep to original value
Logger.info(f"Reverting keep to {listener_keep}")
self.listener.keep = listener_keep
first_iteration = False
[docs] def check_running_tribes(self) -> None:
"""
Check on the state of running tribes.
Because this process starts subprocesses, they need to be stopped
by this process if they "complete". This is recorded by the
real-time-tribe by writing a `.stopfile`
"""
for trigger_event in self._triggered_events:
trigger_event_id = trigger_event.resource_id.id.split('/')[-1]
working_dir = _get_triggered_working_dir(
trigger_event_id, exist_ok=True)
if os.path.isfile(f"{working_dir}/.stopfile"):
self.stop_tribe(trigger_event_id)
[docs] def process_new_events(self, new_events: Union[Catalog, List[Event]]) -> None:
"""
Process any new events in the system.
Check if new events should be in one of the already running
tribes and add them. Check all other events for possible triggers and
spin-up a detector instance for triggers.
Parameters
----------
new_events
Catalog of new-events to be assessed.
"""
# Convert to catalog
if isinstance(new_events, list):
new_events = Catalog(new_events)
for triggering_event_id, tribe_region in self._running_regions.items():
try:
add_events = get_events(new_events, **tribe_region)
except Exception:
# This only occurs when there are no events in the region
# and is fixed by PR #177 on Obsplus.
add_events = Catalog()
# Don't trigger on events now running in another tribe.
new_events.events = [e for e in new_events
if e not in add_events]
# TODO: Implement region growth based on new events added.
added_ids = {e.resource_id.id for e in add_events}.difference(
self.running_template_ids)
if added_ids:
tribe = self.template_database.get_templates(
eventid=added_ids)
tribe = check_tribe_quality(
tribe,
min_stations=self.config.rt_match_filter.min_stations,
**self.config.template)
if len(tribe) > 0:
Logger.info(f"Adding {len(tribe)} events to {triggering_event_id}")
template_dir = os.path.join(
_get_triggered_working_dir(triggering_event_id),
"new_templates")
if not os.path.isdir(template_dir):
os.makedirs(template_dir)
for template in tribe:
template.write(filename=os.path.join(
template_dir, template.name))
Logger.info(f"Written new templates to {template_dir}")
self._running_templates[triggering_event_id].update(
added_ids)
trigger_events = self.trigger_func(new_events)
# Sanitize trigger-events - make sure that multiple events that would otherwise
# run together do not all trigger - sort by magnitude
for trigger_event in trigger_events:
# Make sure they all have a magnitude
if len(trigger_event.magnitudes) == 0 or trigger_event.magnitudes[0] is None:
trigger_event.magnitudes = [Magnitude(mag=-999)]
trigger_events.events.sort(
key=lambda e: (e.preferred_magnitude() or e.magnitudes[0]).mag,
reverse=True)
for trigger_event in trigger_events:
if trigger_event in self._triggered_events:
continue
if trigger_event.resource_id.id in self.running_template_ids:
Logger.info(
f"Not spinning up {trigger_event}: it is already running")
continue
Logger.warning(
"Listener triggered by event {0}".format(trigger_event))
# Send this as an email
self.notifier.notify(
content=f"Listener triggered by event {trigger_event}")
if len(self._running_regions) >= self.available_cores:
Logger.error("No more available processors")
continue
self._triggered_events.append(trigger_event)
self.spin_up(trigger_event)
[docs] def spin_up(self, triggering_event: Event) -> None:
"""
Run the reactors response function as a subprocess.
Parameters
----------
triggering_event
Event that triggered this run - needs to have at-least an origin.
"""
triggering_event_id = triggering_event.resource_id.id.split('/')[-1]
region = estimate_region(
triggering_event,
multiplier=self.config.reactor.scaling_multiplier or 1.0,
min_length=self.config.reactor.minimum_lookup_radius or 50.0)
if region is None:
return
region.update(
{"starttime": self.config.database_manager.lookup_starttime})
Logger.info("Getting templates within {0}".format(region))
df = self.template_database.get_event_summary(**region)
event_ids = {e for e in df["event_id"]}
event_ids = event_ids.difference(self.running_template_ids)
Logger.debug(f"event-ids in region: {event_ids}")
# Write file of event id's
if len(event_ids) == 0:
Logger.warning(f"Found no events in region: {region} - no detection to run.")
return
tribe = self.template_database.get_templates(eventid=event_ids)
Logger.info(f"Found {len(tribe)} templates")
if len(tribe) == 0:
Logger.info("No templates, not running")
return
working_dir = _get_triggered_working_dir(
triggering_event_id, exist_ok=True)
tribe.write(os.path.join(working_dir, "tribe.tgz"))
self.config.write(
os.path.join(working_dir, 'rt_eqcorrscan_config.yml'))
triggering_event.write(
os.path.join(working_dir, 'triggering_event.xml'), format="QUAKEML")
script_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "spin_up.py")
_call = ["python", script_path, "-w", working_dir,
"-n", str(min(self.available_cores, self._max_detect_cores)),
"-s", str(self._speed_up),
"-o", str(self._test_start_step)]
Logger.info("Running `{call}`".format(call=" ".join(_call)))
proc = subprocess.Popen(_call)
self.detecting_processes.update({triggering_event_id: proc})
self._running_regions.update({triggering_event_id: region})
self._running_templates.update(
{triggering_event_id:
{t.event.resource_id.id for t in tribe}})
Logger.info("Started detector subprocess - continuing listening")
[docs] def stop_tribe(self, triggering_event_id: str = None) -> None:
"""
Stop a specific tribe.
Parameters
----------
triggering_event_id
The id of the triggering event for which a tribe is running.
"""
if triggering_event_id is None:
return self.stop()
self.detecting_processes[triggering_event_id].kill()
self.detecting_processes.pop(triggering_event_id)
self._running_templates.pop(triggering_event_id)
def _handle_interupt(self, signum, frame) -> None:
Logger.critical(f"Received signal: {signum}")
self.notifier.notify(
content=f"CRITICAL: Stopping reactor after interupt: {signum}")
self.stop()
[docs] def stop(self, raise_exit: bool=True) -> None:
"""
Stop all the processes.
Parameters
----------
raise_exit:
Whether to exit the Python system or not.
"""
Logger.critical("Stopping the reactor")
self._running = False
Logger.critical("Stopping the listener")
self.listener.background_stop()
Logger.critical("Stopped the listener")
triggering_event_ids = list(self.detecting_processes.keys())
for event_id in triggering_event_ids:
Logger.critical(f"Stopping tribe running for {event_id}")
self.stop_tribe(event_id)
Logger.critical("Stopped")
if raise_exit:
raise SystemExit
[docs]def estimate_region(
event: Event,
min_length: float = 50.,
scaling_relation: Union[str, Callable] = 'default',
multiplier: float = 1.25,
) -> dict:
"""
Estimate the region to find templates within given a triggering event.
Parameters
----------
event
The event that triggered this function
min_length
Minimum length in km for diameter of event circle around the
triggering event
scaling_relation
Name of registered scaling-relationship or Callable that takes only
the earthquake magnitude as an argument and returns length in km
multiplier
Fudge factor to scale the scaling relation up by a constant.
Returns
-------
Dictionary keyed by "latitude", "longitude" and "maxradius"
Notes
-----
The `scaling_relation` * `multiplier` defines the `maxradius` of the region
"""
from obspy.geodetics import kilometer2degrees
try:
origin = event.preferred_origin() or event.origins[0]
except IndexError:
Logger.error("Triggering event has no origin, not using.")
return None
try:
magnitude = event.preferred_magnitude() or event.magnitudes[0]
except IndexError:
Logger.warning("Triggering event has no magnitude, using minimum "
"length or {0}".format(min_length))
magnitude = None
if magnitude:
if not callable(scaling_relation):
scaling_relation = get_scaling_relation(scaling_relation)
length = scaling_relation(magnitude.mag)
length *= multiplier
else:
length = min_length
if length <= min_length:
length = min_length
length = kilometer2degrees(length)
length /= 2.
return {
"latitude": origin.latitude, "longitude": origin.longitude,
"maxradius": length}
if __name__ == "__main__":
import doctest
doctest.testmod()