"""
Classes for real-time matched-filter detection of earthquakes.
"""
import shutil
import time
import traceback
import os
import logging
import copy
import numpy
import gc
import glob
import subprocess
import numpy as np
# Memory tracking for debugging
# import psutil
# from pympler import summary, muppy
from typing import Union, List, Iterable
from obspy import Stream, UTCDateTime, Inventory, Trace
from obsplus import WaveBank
from matplotlib.figure import Figure
from multiprocessing import Lock
from eqcorrscan import Tribe, Template, Party, Detection
from eqcorrscan.utils.pre_processing import _prep_data_for_correlation
from rt_eqcorrscan.streaming.streaming import _StreamingClient
from rt_eqcorrscan.event_trigger.triggers import average_rate
from rt_eqcorrscan.config.mailer import Notifier
Logger = logging.getLogger(__name__)
[docs]class RealTimeTribe(Tribe):
sleep_step = 1.0
plotter = None
plotting_exclude_channels = [
"EHE", "EHN", "EH1", "EH2", "HHE", "HHN", "HH1", "HH2"]
"""
Real-Time tribe for real-time matched-filter detection.
Parameters
----------
name
Tribe identifier - used to define save path.
tribe
Tribe of templates to use for detection.
inventory
Inventory of stations used for detection.
rt_client
Real-Time Client for streaming data.
detect_interval
Frequency to conduct detection. Must be less than buffer_capacity.
plot
Whether to generate the real-time bokeh plot
plot_options
Plotting options parsed to `rt_eqcorrscan.plotting.plot_buffer`
wavebank
WaveBank to save data to. Used for backfilling by RealTimeTribe.
Set to `None` to not use a WaveBank.
sleep_step
Default sleep-step in seconds while waiting for data. Defaults to 1.0
plotting_exclude_channels
Channels to exclude from plotting
"""
# Management of detection multi-processing
process_cores = 2
_parallel_processing = True # This seems unstable for subprocessing.
max_correlation_cores = None
# Thread management
lock = Lock() # Lock for access to internals
_running = False
_detecting_thread = None
_backfillers = dict() # Backfill subprocesses
_backfill_tribe = Tribe() # Tribe of as-yet unused templates for backfilling
_last_backfill_start = UTCDateTime.now() # Time of last backfill run - update on run
_number_of_backfillers = 0 # Book-keeping of backfiller processes.
_clean_backfillers = False # If false will leave temporary backfiller dirs
busy = False
_speed_up = 1.0 # For simulated runs - do not change for real-time!
_stream_end = UTCDateTime(1970, 1, 1) # End of real-time data - will be
# updated in first loop. Used for keeping track of when templates are relative
# to the data received.
_spoilers = True # If the reactor gets ahead of the rt_match_filter (in
# simulations) then templates from the future might be read in. Set this to
# False to disallow templates from the future
_max_wait_length = 60.
_fig = None # Cache figures to save memory
_template_dir = "new_templates" # Where new templates should be.
_min_run_length = 24 * 3600 # Minimum run-length in seconds.
# Usurped by max_run_length, used to set a threshold for rate calculation.
# WaveBank management
wavebank_lock = Lock()
has_wavebank = False
def __init__(
self,
name: str = None,
tribe: Tribe = None,
inventory: Inventory = None,
rt_client: _StreamingClient = None,
detect_interval: float = 60.,
backfill_interval: float = 600.,
plot: bool = True,
plot_options: dict = None,
wavebank: Union[str, WaveBank] = WaveBank("Streaming_WaveBank"),
notifer: Notifier = Notifier()
) -> None:
super().__init__(templates=tribe.templates)
self.rt_client = rt_client
assert (self.rt_client.buffer_capacity >= max(
[template.process_length for template in self.templates]))
assert (self.rt_client.buffer_capacity >= detect_interval)
self.name = name or "RealTimeTribe"
self.inventory = inventory
self.party = Party()
self.detect_interval = detect_interval
self.backfill_interval = backfill_interval
self.plot = plot
self.notifier = notifer
self.plot_options = {}
if plot_options is not None:
self.plot_length = plot_options.get("plot_length", 300)
self.plot_options.update({
key: value for key, value in plot_options.items()
if key != "plot_length"})
self.detections = []
# Wavebank status to avoid accessing the underlying, lockable, wavebank
if isinstance(wavebank, str):
wavebank = WaveBank(wavebank)
self.__wavebank = wavebank
if wavebank:
self.has_wavebank = True
self._wavebank_warned = False # Reduce duplicate warnings
def __repr__(self):
"""
Print information about the tribe.
.. rubric:: Example
>>> from rt_eqcorrscan.streaming.clients.seedlink import RealTimeClient
>>> rt_client = RealTimeClient(server_url="geofon.gfz-potsdam.de")
>>> tribe = RealTimeTribe(
... tribe=Tribe([Template(name='a', process_length=60)]),
... rt_client=rt_client)
>>> print(tribe) # doctest: +NORMALIZE_WHITESPACE
Real-Time Tribe of 1 templates on client:
Seed-link client at geofon.gfz-potsdam.de, status: Stopped, \
buffer capacity: 600.0s
Current Buffer:
Buffer(0 traces, maxlen=600.0)
"""
return 'Real-Time Tribe of {0} templates on client:\n{1}'.format(
self.__len__(), self.rt_client)
@property
def template_seed_ids(self) -> set:
""" Channel-ids used in the templates. """
return set(tr.id for template in self.templates for tr in template.st)
@property
def used_seed_ids(self) -> set:
""" Channel-ids in the inventory. """
if self.inventory is None:
return set()
return set("{net}.{sta}.{loc}.{chan}".format(
net=net.code, sta=sta.code, loc=chan.location_code, chan=chan.code)
for net in self.inventory for sta in net for chan in sta)
@property
def expected_seed_ids(self) -> set:
""" ids of channels to be used for detection. """
if self.inventory is None or len(self.inventory) == 0:
return self.template_seed_ids
return self.template_seed_ids.intersection(self.used_seed_ids)
@property
def used_stations(self) -> set:
""" Set of station names used in detection. """
if self.inventory is None:
return set()
return {sta.code for net in self.inventory for sta in net}
@property
def minimum_data_for_detection(self) -> float:
""" Get the minimum required data length (in seconds) for detection. """
return max(template.process_length for template in self.templates)
@property
def running_templates(self) -> set:
"""
Get a set of the names of the running templates.
Note that names are not guaranteed to be unique.
"""
return {t.name for t in self.templates}
@property
def wavebank(self):
return self.__wavebank
@wavebank.setter
def wavebank(self, wavebank: WaveBank):
self.__wavebank = wavebank
if wavebank:
self.has_wavebank = True
else:
self.has_wavebank = False
def _ensure_templates_have_enough_stations(self, min_stations):
""" Remove templates that don't have enough stations. """
self.templates = [
t for t in self.templates
if len({tr.stats.station for tr in t.st}.intersection(
self.used_stations)) >= min_stations]
def _remove_old_detections(self, endtime: UTCDateTime) -> None:
""" Remove detections older than keep duration. Works in-place. """
# Use a copy to avoid changing list while iterating
for d in copy.copy(self.detections):
if d.detect_time <= endtime:
self.detections.remove(d)
def _remove_unused_backfillers(
self,
trig_int: float,
hypocentral_separation: float,
earliest_detection_time: UTCDateTime,
detect_directory: str,
save_waveforms: bool,
plot_detections: bool,
**kwargs
):
""" Expire unused back fill processes to release resources. """
active_backfillers = dict()
for backfiller_name, backfill_process in self._backfillers.items():
if backfill_process.poll() is not None:
Logger.info(f"Handling detections from {backfiller_name}")
self._backfiller_return(
backfiller_name=backfiller_name,
trig_int=trig_int,
hypocentral_separation=hypocentral_separation,
earliest_detection_time=earliest_detection_time,
detect_directory=detect_directory,
save_waveforms=save_waveforms,
plot_detections=plot_detections)
Logger.info(f"Cleaning backfiller {backfiller_name}")
if os.path.isdir(backfiller_name):
if self._clean_backfillers:
shutil.rmtree(backfiller_name)
else:
Logger.warning(f"Did not find backfiller temp dir {backfiller_name}")
else:
active_backfillers.update({backfiller_name: backfill_process})
Logger.debug(f"There are {len(active_backfillers)} backfillers currently active")
self._backfillers = active_backfillers
def _access_wavebank(
self,
method: str,
timeout: float = None,
*args, **kwargs
):
"""
Thread and process safe access to the wavebank.
Multiple processes cannot access the underlying HDF5 file at the same
time. This method waits until access to the HDF5 file is available and
Parameters
----------
method
Method of wavebank to call
timeout
Maximum time to try to get access to the file
args
Arguments passed to method
kwargs
Keyword arguments passed to method
Returns
-------
Whatever should be returned by the method.
"""
if not self.has_wavebank:
if not self._wavebank_warned:
Logger.error("No wavebank attached to streamer")
return None
timer, wait_step = 0.0, 0.5
Logger.debug("Getting wavebank lock")
with self.wavebank_lock:
try:
func = self.wavebank.__getattribute__(method)
except AttributeError:
Logger.error(f"No wavebank method named {method}")
return None
# Attempt to access the underlying wavebank
out = None
while timer < timeout:
Logger.debug(f"Trying to call {method} on wavebank")
tic = time.time()
try:
out = func(*args, **kwargs)
break
except (IOError, OSError) as e:
Logger.warning(f"Call to {method} failed due to {e}")
time.sleep(wait_step)
toc = time.time()
timer += toc - tic
else:
Logger.error(
f"Waited {timer} s and could not access the wavebank "
f"due to {e}")
return out
[docs] def get_wavebank_stream(self, bulk: List[tuple]) -> Stream:
""" processsafe get-waveforms-bulk call """
st = self._access_wavebank(
method="get_waveforms_bulk", timeout=120., bulk=bulk)
return st
[docs] def get_wavebank_files(self, bulk: List[tuple]) -> List[str]:
""" processsafe way to get the file paths meeting bulk criteria """
paths = []
for _bulk in bulk:
index = self._access_wavebank(
method="read_index", timeout=120, network=_bulk[0],
station=_bulk[1], location=_bulk[2], channel=_bulk[3],
starttime=_bulk[4], endtime=_bulk[5])
files = (str(self.wavebank.bank_path) + os.sep + index.path).unique()
paths.extend(list(files))
return paths
def _backfiller_return(
self,
backfiller_name: str,
trig_int: float,
hypocentral_separation: float,
earliest_detection_time: UTCDateTime,
detect_directory: str,
save_waveforms: bool,
plot_detections: bool,
):
"""
Handle finalisation of backfillers.
Parameters
----------
backfiller_name
"""
if os.path.isfile(f"{backfiller_name}/party.tgz"):
party = Party().read(f"{backfiller_name}/party.tgz")
else:
Logger.info("No party written by backfiller - no detections")
return
Logger.info(f"Read backfiller party of {len(party)} detections")
if len(party) == 0:
return
Logger.info(f"Backfill handling: Trying to get lock - Lock status: {self.lock}")
with self.lock:
Logger.info(f"Backfill handling: Lock acquired - Lock status: {self.lock}")
self._handle_detections(
party, trig_int=trig_int,
hypocentral_separation=hypocentral_separation,
earliest_detection_time=earliest_detection_time,
detect_directory=detect_directory,
save_waveforms=save_waveforms,
plot_detections=plot_detections, st=None, skip_existing=False,
backfill_dir=backfiller_name)
self._remove_old_detections(earliest_detection_time)
Logger.info("Party now contains {0} detections".format(
len(self.detections)))
Logger.info(f"Backfill handling: Lock released - Lock status {self.lock}")
return
def _handle_detections(
self,
new_party: Party,
trig_int: float,
hypocentral_separation: float,
earliest_detection_time: UTCDateTime,
detect_directory: str,
save_waveforms: bool,
plot_detections: bool,
st: Stream = None,
skip_existing: bool = True,
backfill_dir: str = None,
**kwargs
) -> None:
"""
Handle new detections - do all the additional post-detection processing
Parameters
----------
new_party
The party of new detections
trig_int
Minimum inter-detection time in seconds.
hypocentral_separation
Maximum inter-event distance in km to consider detections as being
duplicates.
earliest_detection_time
Earliest (oldest) detection-time to be kept.
detect_directory
The head directory to write to - will create
"{detect_directory}/{year}/{julian day}" directories
save_waveforms
Whether to save the waveform for the detected event or not
plot_detections
Whether to plot the detection waveform or not
st
The stream the detection was made in - required for save_waveform
and plot_detection.
skip_existing
Whether to skip detections already written to disk.
backfill_dir
Location of backfiller if these detections have come from a backfiller.
"""
_detected_templates = [f.template.name for f in self.party]
for family in new_party:
if family is None:
continue
for d in family:
d._calculate_event(template=family.template)
Logger.debug(f"New detection at {d.detect_time}")
# Cope with no picks and hence no origins - these events have to be removed
family.detections = [d for d in family if len(d.event.origins)]
if family.template.name not in _detected_templates:
self.party.families.append(family)
else:
self.party.select(family.template.name).detections.extend(
family.detections)
Logger.info("Removing duplicate detections")
Logger.info(f"Party contained {len(self.party)} before decluster")
if len(self.party) > 0:
# TODO: Need to remove detections from disk that are removed in decluster
self.party.decluster(
trig_int=trig_int, timing="origin", metric="cor_sum",
hypocentral_separation=hypocentral_separation)
Logger.info("Completed decluster")
Logger.info(f"Party contains {len(self.party)} after decluster")
Logger.info("Writing detections to disk")
# Cope with not being given a stream
read_st = False
if st is None and backfill_dir is None:
read_st = True
# TODO: Need a better way to keep track of written detections - unique keys for detections?
# TODO: This is slow, and for Kaikoura, this is what stops it from running in real time
for family in self.party:
for detection in family:
# TODO: this check doesn't necassarily work well - detections may be the same physical detection, but different Detection objects
if detection in self.detections:
continue
detect_file_base = _detection_filename(
detection=detection, detect_directory=detect_directory)
_filename = f"{detect_file_base}.xml"
if os.path.isfile(f"{detect_file_base}.xml") and skip_existing:
Logger.info(f"{_filename} exists, skipping")
continue
Logger.debug(f"Writing detection: {detection.detect_time}")
# TODO: Do not do this, let some other process work on making the waveforms.
if read_st:
max_shift = (
max(tr.stats.endtime for tr in family.template.st) -
min(tr.stats.starttime for tr in family.template.st))
bulk = [
(tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
(detection.detect_time - 5),
(detection.detect_time + max_shift + 5))
for tr in family.template.st]
st = self.wavebank.get_waveforms_bulk(bulk)
st_read = True
self._fig = _write_detection(
detection=detection,
detect_file_base=detect_file_base,
save_waveform=save_waveforms,
plot_detection=plot_detections, stream=st,
fig=self._fig, backfill_dir=backfill_dir,
detect_dir=detect_directory)
Logger.info("Expiring old detections")
# Empty self.detections
self.detections.clear()
for family in self.party:
Logger.debug(f"Checking for {family.template.name}")
family.detections = [
d for d in family.detections if d.detect_time >= earliest_detection_time]
Logger.debug(f"Appending {len(family)} detections")
for detection in family:
# Need to append rather than create a new object
self.detections.append(detection)
return
def _plot(self) -> None: # pragma: no cover
""" Plot the data as it comes in. """
from rt_eqcorrscan.plotting.plot_buffer import EQcorrscanPlot
self._wait()
plot_options = copy.deepcopy(self.plot_options)
update_interval = plot_options.pop("update_interval", 100.)
plot_height = plot_options.pop("plot_height", 800)
plot_width = plot_options.pop("plot_width", 1500)
offline = plot_options.pop("offline", False)
self.plotter = EQcorrscanPlot(
rt_client=self.rt_client, plot_length=self.plot_length,
tribe=self, inventory=self.inventory,
detections=self.detections,
exclude_channels=self.plotting_exclude_channels,
update_interval=update_interval, plot_height=plot_height,
plot_width=plot_width, offline=offline,
**plot_options)
self.plotter.background_run()
def _wait(self, wait: float = None, detection_kwargs: dict = None) -> None:
""" Wait for `wait` seconds, or until all channels are available. """
if wait is not None and wait <= 0:
Logger.info(f"No fucking about - get back! (wait: {wait}")
return
Logger.info("Waiting for data.")
max_wait = min(self._max_wait_length, self.rt_client.buffer_capacity)
wait_length = 0.
while True:
tic = time.time()
if detection_kwargs:
# Check on backfillers
self._remove_unused_backfillers(**detection_kwargs)
if UTCDateTime.now() - self._last_backfill_start >= (self.backfill_interval / self._speed_up) and \
len(self._backfill_tribe) and detection_kwargs:
Logger.info(f"Starting backfilling with {len(self._backfill_tribe)} templates")
self.backfill(templates=self._backfill_tribe.copy(), **detection_kwargs)
# Empty the tribe
self._backfill_tribe.templates = []
self._last_backfill_start = UTCDateTime.now()
# Wait until we have some data
Logger.debug(
"Waiting for data, currently have {0} channels of {1} "
"expected channels".format(
len(self.rt_client.buffer_ids),
len(self.expected_seed_ids)))
if detection_kwargs:
self._add_templates_from_disk(
min_stations=detection_kwargs.get("min_station", None))
else:
new_tribe = self._read_templates_from_disk()
if len(new_tribe) > 0:
self.templates.extend(new_tribe.templates)
# Only sleep if this ran faster than sleep step
iter_time = time.time() - tic
sleep_step = self.sleep_step - iter_time
Logger.debug(f"Iteration of wait took {iter_time}s. Sleeping for {sleep_step}s")
if sleep_step > 0:
time.sleep(sleep_step)
toc_sleep = time.time()
wait_length += (toc_sleep - tic)
if wait is None:
if len(self.rt_client.buffer_ids) >= len(self.expected_seed_ids):
break
if wait_length >= max_wait:
Logger.warning(
"Starting operation without the full dataset")
break
elif wait_length >= wait:
break
pass
return
def _start_streaming(self):
if not self.rt_client.started:
self.rt_client.start()
if self.rt_client.can_add_streams:
for tr_id in self.expected_seed_ids:
self.rt_client.select_stream(
net=tr_id.split('.')[0], station=tr_id.split('.')[1],
selector=tr_id.split('.')[3])
else:
Logger.warning("Client already in streaming mode,"
" cannot add channels")
if not self.rt_client.streaming:
self.rt_client.background_run()
Logger.info("Started real-time streaming")
else:
Logger.info("Real-time streaming already running")
def _runtime_check(self, run_start, max_run_length):
run_time = UTCDateTime.now() - run_start
if max_run_length is None:
Logger.info(
f"Run time: {run_time:.2f}s, no maximum run length")
else:
Logger.info(
f"Run time: {run_time:.2f}s, max_run_length: {max_run_length:.2f}s")
if run_time > max_run_length:
Logger.critical("Hit maximum run time, stopping.")
self.stop()
return False
return True
[docs] def run(
self,
threshold: float,
threshold_type: str,
trig_int: float,
hypocentral_separation: float = None,
min_stations: int = None,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
max_run_length: float = None,
minimum_rate: float = None,
backfill_to: UTCDateTime = None,
backfill_client=None,
**kwargs
) -> Party:
"""
Run the RealTimeTribe detection.
Detections will be added to a party and returned when the detection
is done. Detections will be stored in memory for up to `keep_detections`
seconds. Detections will also be written to individual files in the
`detect_directory`.
Parameters
----------
threshold
Threshold for detection
threshold_type
Type of threshold to use. See
`eqcorrscan.core.match_filter.Tribe.detect` for options.
trig_int
Minimum inter-detection time in seconds.
hypocentral_separation
Maximum inter-event distance in km to consider detections as being
duplicates.
min_stations
Minimum number of stations required to make a detection.
keep_detections
Duration to store detection in memory for in seconds.
detect_directory
Relative path to directory for detections. This directory will be
created if it doesn't exist - tribe name will be appended to this
string to give the directory name.
plot_detections
Whether to plot detections or not - plots will be saved to the
`detect_directory` as png images.
save_waveforms
Whether to save waveforms of detections or not - waveforms
will be saved in the `detect_directory` as miniseed files.
max_run_length
Maximum detection run time in seconds. Default is to run
indefinitely.
minimum_rate
Stopping criteria: if the detection rate drops below this the
detector will stop. If set to None, then the detector will run
until `max_run_length`. Units: events per day
backfill_to
Time to backfill the data buffer to.
backfill_client
Client to use to backfill the data buffer.
Returns
-------
The party created - will not contain detections expired by
`keep_detections` threshold.
"""
# Update backfill start time
self._last_backfill_start = UTCDateTime.now()
restart_interval = 600.0
# Squash duplicate channels to avoid excessive channels
self.templates = [squash_duplicates(t) for t in self.templates]
# Reshape the templates first
if len(self.templates) > 0:
self.templates = reshape_templates(
templates=self.templates, used_seed_ids=self.expected_seed_ids)
else:
Logger.error("No templates, will not run")
return Party()
# Remove templates that do not have enough stations in common with the
# inventory.
min_stations = min_stations or 0
n = len(self.templates)
self._ensure_templates_have_enough_stations(min_stations=min_stations)
n -= len(self.templates)
Logger.info(
f"{n} templates were removed because they did not share enough "
f"stations with the inventory. {len(self.templates)} will be used")
if len(self.templates) == 0:
Logger.critical("No templates remain, not running")
return Party()
# Fix unsupported args
try:
if kwargs.pop("plot"):
Logger.info("EQcorrscan plotting disabled")
except KeyError:
pass
_cores = kwargs.get("cores", None)
if _cores:
self.max_correlation_cores = min(
kwargs.pop("cores"), self.max_correlation_cores)
run_start = UTCDateTime.now()
detection_iteration = 0 # Counter for number of detection loops run
if not self.busy:
self.busy = True
detect_directory = detect_directory.format(name=self.name)
if not os.path.isdir(detect_directory):
os.makedirs(detect_directory)
# Get this locally before streaming starts
buffer_capacity = self.rt_client.buffer_capacity
# Start the streamer
self._start_streaming()
Logger.info("Detection will use the following data: {0}".format(
self.expected_seed_ids))
if backfill_client and backfill_to:
backfill = Stream()
self._wait()
_buffer = self.rt_client.stream
for tr_id in self.expected_seed_ids:
try:
tr_in_buffer = _buffer.select(id=tr_id)[0]
except IndexError:
continue
endtime = tr_in_buffer.stats.starttime
if endtime - backfill_to > buffer_capacity:
Logger.info("Truncating backfill to buffer length")
starttime = endtime - buffer_capacity
else:
starttime = backfill_to
if starttime > endtime:
continue
try:
tr = backfill_client.get_waveforms(
*tr_id.split('.'), starttime=starttime,
endtime=endtime).merge()[0]
except Exception as e:
Logger.error("Could not back fill due to: {0}".format(e))
Logger.error(f"The request was: {tr_id.split('.')},"
f" starttime={starttime}, endtime={endtime}")
continue
Logger.debug("Downloaded backfill: {0}".format(tr))
backfill += tr
for tr in backfill:
# Get the lock!
Logger.debug(f"Adding {tr.id} to the buffer")
self.rt_client.on_data(tr)
Logger.debug("Stream in buffer is now: {0}".format(
self.rt_client.stream))
if self.plot: # pragma: no cover
# Set up plotting thread
self._plot()
Logger.info("Plotting thread started")
if not self.rt_client.buffer_length >= self.minimum_data_for_detection:
sleep_step = (
self.minimum_data_for_detection -
self.rt_client.buffer_length + 5) / self._speed_up
Logger.info("Sleeping for {0:.2f}s while accumulating data".format(
sleep_step))
self._wait(sleep_step)
first_data = min([tr.stats.starttime
for tr in self.rt_client.stream.merge()])
detection_kwargs = dict(
threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, hypocentral_separation=hypocentral_separation,
keep_detections=keep_detections, detect_directory=detect_directory,
plot_detections=plot_detections, save_waveforms=save_waveforms,
maximum_backfill=first_data, endtime=None,
min_stations=min_stations, earliest_detection_time=None)
long_runs, long_run_time = 0, 0 # Keep track of over-running loops
try:
while self.busy:
try:
self._running = True # Lock tribe
start_time = UTCDateTime.now()
st = self.rt_client.stream.split().merge()
# Warn if data are gappy
gappy = False
for tr in st:
if np.ma.is_masked(tr.data):
gappy = True
gaps = tr.split().get_gaps()
Logger.warning(f"Masked data found on {tr.id}. Gaps: {gaps}")
if gappy:
st = st.merge() # Re-merge after gap checking
if self.has_wavebank:
st = _check_stream_is_int(st)
try:
self._access_wavebank(
method="put_waveforms", timeout=10.,
stream=st)
except Exception as e:
Logger.error(
f"Could not write to wavebank due to {e}")
last_data_received = self.rt_client.last_data
# Split to remove trailing mask
if len(st) == 0:
Logger.warning("No data")
continue
elif last_data_received is None:
Logger.warning("Streamer incorrectly reported None for last data received, "
"setting to stream end")
last_data_received = max(tr.stats.endtime for tr in st)
Logger.info(
f"Streaming Client last received data at "
f"{last_data_received}")
self._stream_end = max(tr.stats.endtime for tr in st)
min_stream_end = min(tr.stats.endtime for tr in st)
# Update detection kwargs endtime to end of current data - no need to backfill beyond that
detection_kwargs["endtime"] = self._stream_end
Logger.info(
f"Real-time client provided data: \n{st.__str__(extended=True)}")
# Cope with data that doesn't come
if start_time - last_data_received > restart_interval:
Logger.warning(
"The streaming client has not given any new data for "
f"{restart_interval} seconds. Restarting Streaming client")
Logger.info(f"start_time: {start_time}, last_data_received: "
f"{last_data_received}, stream_end: {self._stream_end}")
Logger.info("Stopping streamer")
self.rt_client.background_stop()
self.rt_client.stop()
# Get a clean instance just in case
Logger.info("Starting streamer")
self._start_streaming()
Logger.info("Streamer started")
st = self.rt_client.stream.split().merge() # Get data again.
Logger.info("Streaming client seems healthy")
# Remove any data that shouldn't be there - sometimes GeoNet's
# Seedlink client gives old data.
Logger.info(
f"Trimming between {self._stream_end - (buffer_capacity + 20.0)} "
f"and {self._stream_end}")
st.trim(
starttime=self._stream_end - (buffer_capacity + 20.0),
endtime=self._stream_end)
if detection_iteration > 0:
# For the first run we want to detect in everything we have.
# Otherwise trim so that all channels have at-least minimum data for detection
st.trim(
starttime=min_stream_end - self.minimum_data_for_detection,
endtime=self._stream_end)
Logger.info("Trimmed data")
if len(st) == 0:
Logger.warning("No data")
continue
# Remove short channels
st.traces = [
tr for tr in st
if _numpy_len(tr.data) >= (
.8 * self.minimum_data_for_detection)]
if len(st) == 0:
Logger.error("Insufficient data after trimming, accumulating")
continue
Logger.info("Starting detection run")
# merge again - checking length can split the data?
st = st.merge()
Logger.info("Using data: \n{0}".format(
st.__str__(extended=True)))
try:
Logger.debug("Currently have {0} templates in tribe".format(
len(self)))
new_party = self.detect(
stream=st, plot=False, threshold=threshold,
threshold_type=threshold_type, trig_int=trig_int,
xcorr_func="fftw", concurrency="concurrent",
cores=self.max_correlation_cores,
process_cores=self.process_cores,
parallel_process=self._parallel_processing,
ignore_bad_data=True, copy_data=False,
**kwargs)
Logger.info("Completed detection")
except Exception as e: # pragma: no cover
Logger.error(e)
Logger.error(traceback.format_exc())
if "Cannot allocate memory" in str(e):
Logger.error("Out of memory, stopping this detector")
self.stop()
break
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
Logger.info(
"Waiting for {0:.2f}s and hoping this gets "
"better".format(self.detect_interval))
time.sleep(self.detect_interval)
continue
Logger.info(f"Trying to get lock - Lock status: {self.lock}")
detection_kwargs.update(
dict(earliest_detection_time=self._stream_end - keep_detections))
with self.lock:
Logger.info(f"Lock acquired - Lock status: {self.lock}")
if len(new_party) > 0:
self._handle_detections(
new_party, st=st,
**detection_kwargs)
self._remove_old_detections(
self._stream_end - keep_detections)
Logger.info("Party now contains {0} detections".format(
len(self.detections)))
Logger.info(f"Lock released - Lock status {self.lock}")
self._running = False # Release lock
run_time = (UTCDateTime.now() - start_time) * self._speed_up # Work in fake time
Logger.info("Detection took {0:.2f}s".format(run_time))
if self.detect_interval <= run_time:
long_run_time += run_time
long_runs += 1
if long_runs > 10:
long_run_time /= long_runs
# NEVER EXCEED THE BUFFER LENGTH!!!!
new_detect_interval = min(self.rt_client.buffer_length - 10, long_run_time + 10)
Logger.warning(
"detect_interval {0:.2f} shorter than run-time for 10 occasions"
"{1:.2f}, increasing detect_interval to {2:.2f}".format(
self.detect_interval, run_time, new_detect_interval))
self.detect_interval = new_detect_interval
long_runs, long_run_time = 0, 0 # Reset counters
Logger.info("Iteration {0} took {1:.2f}s total".format(
detection_iteration, run_time))
Logger.info("Waiting {0:.2f}s until next run".format(
self.detect_interval - run_time))
detection_iteration += 1
self._wait(
wait=(self.detect_interval - run_time) / self._speed_up, # Convert to real-time
detection_kwargs=detection_kwargs)
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
self.stop()
break
if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length:
_rate = average_rate(
self.detections,
starttime=max(
self._stream_end - keep_detections, first_data),
endtime=self._stream_end)
Logger.info(f"Average rate:\t{_rate}, "
f"minimum rate:\t{minimum_rate}")
if _rate < minimum_rate:
Logger.critical(
"Rate ({0:.2f}) has dropped below minimum rate, "
"stopping.".format(_rate))
self.stop()
break
# Logger.info("Enforcing garbage collection")
# gc.collect()
# Memory output
# Logger.info("Working out memory use")
# sum1 = summary.summarize(muppy.get_objects())
# for line in summary.format_(sum1):
# Logger.info(line)
# Total memory used for process according to psutil
# total_memory_mb = psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2
# Logger.info(f"Total memory used by {os.getpid()}: {total_memory_mb:.4f} MB")
except Exception as e:
# Error locally and mail this in!
Logger.critical(f"Uncaught error: {e}")
Logger.error(traceback.format_exc())
message = f"""\
Uncaught error: {e}
Traceback:
{traceback.format_exc()}
"""
self.notifier.notify(content=message)
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
finally:
Logger.critical("Stopping")
self.stop()
return self.party
def _read_templates_from_disk(self):
template_files = glob.glob(f"{self._template_dir}/*")
if len(template_files) == 0:
return []
Logger.debug(f"Checking for events in {self._template_dir}")
new_tribe = Tribe()
for template_file in template_files:
if os.path.isdir(template_file):
# Can happen if the archive hasn't finished being created
continue
Logger.debug(f"Reading from {template_file}")
try:
template = Template().read(template_file)
except Exception as e:
Logger.error(f"Could not read {template_file} due to {e}")
continue
template_endtime = max(tr.stats.endtime for tr in template.st)
if template_endtime > self._stream_end:
msg = (f"Template {template_file} ends at {template_endtime}, "
f"after now: {self._stream_end}")
if self._spoilers:
Logger.warning(msg)
else:
Logger.debug(msg)
continue # Skip and do not remove template, we will get it later
# If we got to here we can add the template to the tribe and remove the file.
new_tribe += template
if os.path.isfile(template_file):
os.remove(template_file) # Remove file once done with it.
new_tribe.templates = [t for t in new_tribe
if t.name not in self.running_templates]
if len(new_tribe):
Logger.info(f"Read in {len(new_tribe)} new templates from disk")
return new_tribe
def _add_templates_from_disk(
self,
min_stations: int,
):
new_tribe = self._read_templates_from_disk()
if len(new_tribe) == 0:
return
Logger.info(
f"Adding {len(new_tribe)} templates to already running tribe.")
self.add_templates(new_tribe, min_stations=min_stations)
[docs] def add_templates(
self,
templates: Union[List[Template], Tribe],
min_stations: int = None,
) -> set:
"""
Add templates to the tribe.
This method will run the new templates back in time, then append the
templates to the already running tribe.
Parameters
----------
templates
New templates to add to the tribe.
min_stations:
Minimum number of stations required to make a detection.
Returns
-------
Complete set of template names after addition
"""
while self._running:
Logger.info("Waiting for access to tribe in add_templates")
time.sleep(1) # Wait until the lock is released
if isinstance(templates, list):
new_tribe = Tribe(templates)
else:
new_tribe = templates
# Squash duplicate channels to avoid excessive channels
new_tribe.templates = [squash_duplicates(t) for t in new_tribe.templates]
# Reshape
new_tribe.templates = reshape_templates(
templates=new_tribe.templates, used_seed_ids=self.expected_seed_ids)
# Remove templates that do not have enough stations.
min_stations = min_stations or 0
n = len(new_tribe)
new_tribe.templates = [
t for t in new_tribe.templates
if len({tr.stats.station for tr in t.st}.intersection(
self.used_stations)) >= min_stations]
n -= len(new_tribe)
Logger.info(
f"{n} templates were removed because they did not have enough "
f"stations. {len(new_tribe)} will be added to the running tribe.")
self.templates.extend(new_tribe.templates)
# Add templates to backfill set.
self._backfill_tribe.templates.extend(new_tribe.templates)
return set(t.name for t in self.templates)
[docs] def backfill(
self,
templates: Union[List[Template], Tribe],
threshold: float,
threshold_type: str,
trig_int: float,
maximum_backfill: Union[float, UTCDateTime] = None,
endtime: UTCDateTime = None,
plot_detections: bool = False,
**kwargs
) -> None:
"""
Backfill using data from rt_client's wavebank.
This method will run the new templates through old data and record
detections in the real-time-tribe.
Parameters
----------
templates
New templates to add to the tribe.
threshold
Threshold for detection
threshold_type
Type of threshold to use. See
`eqcorrscan.core.match_filter.Tribe.detect` for options.
trig_int
Minimum inter-detection time in seconds.
maximum_backfill
Time in seconds to backfill to - if this is larger than the
difference between the time now and the time that the tribe
started, then it will backfill to when the tribe started.
endtime
Time to stop the backfill, if None will run to now.
"""
# Get the stream - Only let the main process get the Stream
Logger.info("Acquiring stream from wavebank")
endtime = endtime or UTCDateTime.now()
if maximum_backfill is not None:
if isinstance(maximum_backfill, (float, int)):
starttime = endtime - maximum_backfill
elif isinstance(maximum_backfill, UTCDateTime):
starttime = maximum_backfill
else:
Logger.warning(
f"maximum_backfill is {type(maximum_backfill)}, not float "
"or UTCDateTime, starting from 0")
starttime = UTCDateTime(0)
else:
starttime = UTCDateTime(0)
Logger.info(f"Backfilling between {starttime} and {endtime}")
if starttime >= endtime or not self.has_wavebank:
Logger.info("No data meets backfill needs. Returning")
return
if self.expected_seed_ids and len(self.expected_seed_ids) > 0:
bulk = []
for chan in self.expected_seed_ids:
query = chan.split('.')
query.extend([starttime, endtime])
bulk.append(tuple(query))
else:
Logger.warning("No expected seed ids")
return
if len(bulk) == 0:
Logger.warning("No bulk")
return
Logger.debug(f"Getting stations for backfill: {bulk}")
# st = self.get_wavebank_stream(bulk)
st_files = self.get_wavebank_files(bulk)
Logger.info(f"Concatenating {len(st_files)} stream files for backfill")
self._number_of_backfillers += 1
backfiller_name = f"Backfiller_{self._number_of_backfillers}"
# Make working directory and write files.
if not os.path.isdir(backfiller_name):
os.makedirs(backfiller_name, exist_ok=True)
# Just copy all the files to a streams folder and use a LocalClient for backfiller
os.makedirs(f"{backfiller_name}/streams")
for st_file in st_files:
st_file_new_path = st_file.split(str(self.wavebank.bank_path))[-1]
st_file_new_path = f"{backfiller_name}/streams/{st_file_new_path}"
os.makedirs(os.path.split(st_file_new_path)[0], exist_ok=True)
# shutil.copyfile(st_file, st_file_new_path)
os.link(st_file, st_file_new_path) # Link, rather than copy
# st.write(f"{backfiller_name}/stream.ms", format="MSEED")
# with open(f"{backfiller_name}/stream.ms", "wb") as fout:
# for st_file in st_files:
# with open(st_file, "rb") as fin:
# fout.write(fin.read())
if isinstance(templates, Tribe):
tribe = templates
else:
tribe = Tribe(templates)
tribe.write(f"{backfiller_name}/tribe.tgz")
del st_files
# Force garbage collection before creating new process
gc.collect()
working_dir = os.path.abspath(backfiller_name)
# Start backfiller subprocess
script_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "reactor", "backfill.py")
_call = [
"python", script_path,
"-w", working_dir,
"-m", str(self.minimum_data_for_detection),
"-t", str(threshold),
"-T", threshold_type,
"-i", str(trig_int),
"-c", str(self.max_correlation_cores),
"--starttime", str(starttime),
"--endtime", str(endtime),
"-P", # Enable parallel processing
]
if plot_detections:
_call.append("--plot")
_call.append("-s") # Add on the list of expected seed ids
_call.extend(self.expected_seed_ids)
Logger.info("Running `{call}`".format(call=" ".join(_call)))
proc = subprocess.Popen(_call)
self._backfillers.update({backfiller_name: proc})
Logger.info("Backfill process started, returning")
return
[docs] def stop(self, write_stopfile: bool = False) -> None:
"""
Stop the real-time system.
Parameters
----------
write_stopfile:
Used to write a one-line file telling listening systems that
this has stopped. Used by the Reactor.
"""
if self.plotter is not None: # pragma: no cover
self.plotter.background_stop()
self.rt_client.background_stop()
self.busy = False
self._running = False
if self._detecting_thread is not None:
self._detecting_thread.join()
# Kill all the backfillers
for backfiller in self._backfillers.values():
backfiller.kill()
if self._clean_backfillers:
for backfiller_name in self._backfillers.keys():
if os.path.isdir(backfiller_name):
shutil.rmtree(backfiller_name)
if write_stopfile:
with open(".stopfile", "a") as f:
f.write(f"{self.name}\n")
[docs]def reshape_templates(
templates: List[Template],
used_seed_ids: Iterable[str]
) -> List[Template]:
"""
Reshape templates to have the full set of required channels (and no more).
This is done within matched-filter as well as here - we do it here so that
the templates are not reshaped every iteration.
Parameters
----------
templates
Templates to be reshaped - the templates are changed in-place
used_seed_ids
Seed-ids used for detection (network.station.location.channel)
Returns
-------
Templates filled for detection.
"""
template_streams = [t.st for t in templates]
template_names = [t.name for t in templates]
samp_rate = template_streams[0][0].stats.sampling_rate
process_len = max(t.process_length for t in templates)
# Make a dummy stream with all the used seed ids
stream = Stream()
for seed_id in used_seed_ids:
net, sta, loc, chan = seed_id.split('.')
tr = Trace(header=dict(
network=net, station=sta, location=loc, channel=chan,
sampling_rate=samp_rate),
data=numpy.empty(int(process_len * samp_rate)))
stream += tr
_, template_streams, template_names = _prep_data_for_correlation(
stream=stream, templates=template_streams,
template_names=template_names, force_stream_epoch=False)
templates_back = []
for template_st, template_name in zip(template_streams, template_names):
original = [t for t in templates if t.name == template_name]
assert len(original) == 1
original = original[0]
original.st = template_st
templates_back.append(original)
return templates_back
[docs]def squash_duplicates(template: Template):
"""
Remove duplicate channels in templates.
This happens when there are duplicate picks, and it fucks shit up.
More explicitly (less?): when there are duplicate picks, duplicate
channels appear in the template, which are then extended to all
templates meaning that many copies of a template are run, resulting
in excessive detection bias on one channel, and very expensive templates
without good reason.
"""
from collections import Counter
# Check if there are duplicate channels first
seed_ids = Counter(tr.id for tr in template.st)
if seed_ids.most_common(1)[0][1] == 1:
# Do nothing, no duplicates
return template
unique_template_st = Stream()
unique_event_picks = []
for seed_id, repeats in seed_ids.most_common():
# TODO: this restricts to only picks on that seed id - but we could just have matched picks on station
seed_id_picks = [p for p in template.event.picks
if p.waveform_id.get_seed_string() == seed_id
and p.phase_hint[0] in "PS"]
if repeats == 1:
unique_template_st += template.st.select(id=seed_id)
unique_event_picks.append(seed_id_picks[0])
continue
# Now we get to doing something - get the stream and the picks
repeat_stream = template.st.select(id=seed_id)
# Get the unique start-times
unique_traces = {tr.stats.starttime.datetime: tr
for tr in repeat_stream.traces}
if len(unique_traces) == 1:
unique_template_st += unique_traces.popitem()[1]
unique_event_picks.append(seed_id_picks[0])
continue
# If there are more than one unique start-time then we need to
# find the appropriate pick for each
unique_picks = {f"{p.phase_hint}_{p.time}": p for p in seed_id_picks}
for pick in unique_picks.values():
expected_starttime = pick.time - template.prepick
tr = unique_traces.get(expected_starttime.datetime, None)
if tr:
unique_event_picks.append(pick)
unique_template_st += tr
else:
Logger.debug(f"No trace for pick at {pick.time}")
template.st = unique_template_st
template.event.picks = unique_event_picks
return template
def _detection_filename(
detection: Detection,
detect_directory: str,
) -> str:
_path = os.path.join(
detect_directory, detection.detect_time.strftime("%Y"),
detection.detect_time.strftime("%j"))
if not os.path.isdir(_path):
os.makedirs(_path)
_filename = os.path.join(
_path, detection.detect_time.strftime("%Y%m%dT%H%M%S"))
return _filename
def _write_detection(
detection: Detection,
detect_file_base: str,
save_waveform: bool,
plot_detection: bool,
stream: Stream,
fig=None,
backfill_dir: str = None,
detect_dir: str = None
) -> Figure:
"""
Handle detection writing including writing streams and figures.
Parameters
----------
detection
The Detection to write
detect_file_base
File to write to (without extension)
save_waveform
Whether to save the waveform for the detected event or not
plot_detection
Whether to plot the detection waveform or not
stream
The stream the detection was made in - required for save_waveform and
plot_detection.
fig
A figure object to reuse.
backfill_dir:
Backfill directory - set if the detections have already been written
to this dir and just need to be copied.
detect_dir
Detection directory - only used to manipulate backfillers.
Returns
-------
An empty figure object to be reused if a figure was created, or the figure
passed to it.
"""
from rt_eqcorrscan.plotting.plot_event import plot_event
if backfill_dir:
backfill_file_base = (
f"{backfill_dir}/detections/{detect_file_base.split(detect_dir)[-1]}")
Logger.info(f"Looking for detections in {backfill_file_base}.*")
backfill_dets = glob.glob(f"{backfill_file_base}.*")
Logger.info(f"Copying {len(backfill_dets)} to main detections")
for f in backfill_dets:
ext = os.path.splitext(f)[-1]
shutil.copyfile(f, f"{detect_file_base}{ext}")
Logger.info(f"Copied {f} to {detect_file_base}{ext}")
return fig
try:
detection.event.write(f"{detect_file_base}.xml", format="QUAKEML")
except Exception as e:
Logger.error(f"Could not write event file due to {e}")
detection.event.picks.sort(key=lambda p: p.time)
st = stream.slice(
detection.event.picks[0].time - 10,
detection.event.picks[-1].time + 20).copy()
if plot_detection:
# Make plot
fig = plot_event(fig=fig, event=detection.event, st=st,
length=90, show=False)
try:
fig.savefig(f"{detect_file_base}.png")
except Exception as e:
Logger.error(f"Could not write plot due to {e}")
fig.clf()
if save_waveform:
st = _check_stream_is_int(st)
try:
st.write(f"{detect_file_base}.ms", format="MSEED")
except Exception as e:
Logger.error(f"Could not write stream due to {e}")
return fig
def _check_stream_is_int(st):
st = st.split()
for tr in st:
# Ensure data are int32, see https://github.com/obspy/obspy/issues/2683
if tr.data.dtype == numpy.int32 and \
tr.data.dtype.type != numpy.int32:
tr.data = tr.data.astype(numpy.int32, subok=False)
if tr.data.dtype.type == numpy.intc:
tr.data = tr.data.astype(numpy.int32, subok=False)
return st
def _numpy_len(arr: Union[numpy.ndarray, numpy.ma.MaskedArray]) -> int:
"""
Convenience function to return the length of a numpy array.
If arr is a masked array will return the count of the non-masked elements.
Parameters
----------
arr
Array to get the length of - must be 1D
Returns
-------
Length of non-masked elements
"""
assert arr.ndim == 1, "Only supports 1D arrays."
if numpy.ma.is_masked(arr):
return arr.count()
return arr.shape[0]
if __name__ == "__main__":
import doctest
doctest.testmod()