"""
Classes for real-time matched-filter detection of earthquakes.
"""
import time
import traceback
import os
import logging
import copy
import numpy
import gc
import glob
# from pympler import summary, muppy
from typing import Union, List, Iterable
from obspy import Stream, UTCDateTime, Inventory, Trace
from obsplus import WaveBank
from matplotlib.figure import Figure
from multiprocessing import Process, Lock
from eqcorrscan import Tribe, Template, Party, Detection
from eqcorrscan.utils.pre_processing import _prep_data_for_correlation
from rt_eqcorrscan.streaming.streaming import _StreamingClient
from rt_eqcorrscan.event_trigger.triggers import average_rate
Logger = logging.getLogger(__name__)
[docs]class RealTimeTribe(Tribe):
sleep_step = 1.0
plotter = None
plotting_exclude_channels = [
"EHE", "EHN", "EH1", "EH2", "HHE", "HHN", "HH1", "HH2"]
"""
Real-Time tribe for real-time matched-filter detection.
Parameters
----------
name
Tribe identifier - used to define save path.
tribe
Tribe of templates to use for detection.
inventory
Inventory of stations used for detection.
rt_client
Real-Time Client for streaming data.
detect_interval
Frequency to conduct detection. Must be less than buffer_capacity.
plot
Whether to generate the real-time bokeh plot
plot_options
Plotting options parsed to `rt_eqcorrscan.plotting.plot_buffer`
wavebank
WaveBank to save data to. Used for backfilling by RealTimeTribe.
Set to `None` to not use a WaveBank.
sleep_step
Default sleep-step in seconds while waiting for data. Defaults to 1.0
plotting_exclude_channels
Channels to exclude from plotting
"""
# Management of detection multi-processing
process_cores = 2
_parallel_processing = True # This seems unstable for subprocessing.
max_correlation_cores = None
# Thread management
lock = Lock() # Lock for access to internals
_running = False
_detecting_thread = None
_backfillers = [] # Backfill processes
_number_of_backfillers = 0 # Book-keeping of backfiller processes.
busy = False
_speed_up = 1.0 # For simulated runs - do not change for real-time!
_max_wait_length = 60.
_fig = None # Cache figures to save memory
_template_dir = "new_templates" # Where new templates should be.
_min_run_length = 24 * 3600 # Minimum run-length in seconds.
# Usurped by max_run_length, used to set a threshold for rate calculation.
# WaveBank management
wavebank_lock = Lock()
has_wavebank = False
def __init__(
self,
name: str = None,
tribe: Tribe = None,
inventory: Inventory = None,
rt_client: _StreamingClient = None,
detect_interval: float = 60.,
plot: bool = True,
plot_options: dict = None,
wavebank: Union[str, WaveBank] = WaveBank("Streaming_WaveBank"),
) -> None:
super().__init__(templates=tribe.templates)
self.rt_client = rt_client
assert (self.rt_client.buffer_capacity >= max(
[template.process_length for template in self.templates]))
assert (self.rt_client.buffer_capacity >= detect_interval)
self.name = name or "RealTimeTribe"
self.inventory = inventory
self.party = Party()
self.detect_interval = detect_interval
self.plot = plot
self.plot_options = {}
if plot_options is not None:
self.plot_length = plot_options.get("plot_length", 300)
self.plot_options.update({
key: value for key, value in plot_options.items()
if key != "plot_length"})
self.detections = []
# Wavebank status to avoid accessing the underlying, lockable, wavebank
if isinstance(wavebank, str):
wavebank = WaveBank(wavebank)
self.__wavebank = wavebank
if wavebank:
self.has_wavebank = True
self._wavebank_warned = False # Reduce duplicate warnings
def __repr__(self):
"""
Print information about the tribe.
.. rubric:: Example
>>> from rt_eqcorrscan.streaming.clients.seedlink import RealTimeClient
>>> rt_client = RealTimeClient(server_url="geofon.gfz-potsdam.de")
>>> tribe = RealTimeTribe(
... tribe=Tribe([Template(name='a', process_length=60)]),
... rt_client=rt_client)
>>> print(tribe) # doctest: +NORMALIZE_WHITESPACE
Real-Time Tribe of 1 templates on client:
Seed-link client at geofon.gfz-potsdam.de, status: Stopped, \
buffer capacity: 600.0s
Current Buffer:
Buffer(0 traces, maxlen=600.0)
"""
return 'Real-Time Tribe of {0} templates on client:\n{1}'.format(
self.__len__(), self.rt_client)
@property
def template_seed_ids(self) -> set:
""" Channel-ids used in the templates. """
return set(tr.id for template in self.templates for tr in template.st)
@property
def used_seed_ids(self) -> set:
""" Channel-ids in the inventory. """
if self.inventory is None:
return set()
return set("{net}.{sta}.{loc}.{chan}".format(
net=net.code, sta=sta.code, loc=chan.location_code, chan=chan.code)
for net in self.inventory for sta in net for chan in sta)
@property
def expected_seed_ids(self) -> set:
""" ids of channels to be used for detection. """
if self.inventory is None or len(self.inventory) == 0:
return self.template_seed_ids
return self.template_seed_ids.intersection(self.used_seed_ids)
@property
def used_stations(self) -> set:
""" Set of station names used in detection. """
if self.inventory is None:
return set()
return {sta.code for net in self.inventory for sta in net}
@property
def minimum_data_for_detection(self) -> float:
""" Get the minimum required data length (in seconds) for detection. """
return max(template.process_length for template in self.templates)
@property
def running_templates(self) -> set:
"""
Get a set of the names of the running templates.
Note that names are not guaranteed to be unique.
"""
return {t.name for t in self.templates}
@property
def wavebank(self):
return self.__wavebank
@wavebank.setter
def wavebank(self, wavebank: WaveBank):
self.__wavebank = wavebank
if wavebank:
self.has_wavebank = True
else:
self.has_wavebank = False
def _ensure_templates_have_enough_stations(self, min_stations):
""" Remove templates that don't have enough stations. """
self.templates = [
t for t in self.templates
if len({tr.stats.station for tr in t.st}.intersection(
self.used_stations)) >= min_stations]
def _remove_old_detections(self, endtime: UTCDateTime) -> None:
""" Remove detections older than keep duration. Works in-place. """
# Use a copy to avoid changing list while iterating
for d in copy.copy(self.detections):
if d.detect_time <= endtime:
self.detections.remove(d)
def _remove_unused_backfillers(self):
""" Expire unused back fill processes to release resources. """
active_backfillers = []
for backfill_process in self._backfillers:
if not backfill_process.is_alive():
backfill_process.join()
backfill_process.close()
else:
active_backfillers.append(backfill_process)
self._backfillers = active_backfillers
def _access_wavebank(
self,
method: str,
timeout: float = None,
*args, **kwargs
):
"""
Thread and process safe access to the wavebank.
Multiple processes cannot access the underlying HDF5 file at the same
time. This method waits until access to the HDF5 file is available and
Parameters
----------
method
Method of wavebank to call
timeout
Maximum time to try to get access to the file
args
Arguments passed to method
kwargs
Keyword arguments passed to method
Returns
-------
Whatever should be returned by the method.
"""
if not self.has_wavebank:
if not self._wavebank_warned:
Logger.error("No wavebank attached to streamer")
return None
timer, wait_step = 0.0, 0.5
with self.wavebank_lock:
try:
func = self.wavebank.__getattribute__(method)
except AttributeError:
Logger.error(f"No wavebank method named {method}")
return None
# Attempt to access the underlying wavebank
out = None
while timer < timeout:
tic = time.time()
try:
out = func(*args, **kwargs)
break
except (IOError, OSError) as e:
time.sleep(wait_step)
toc = time.time()
timer += toc - tic
else:
Logger.error(
f"Waited {timer} s and could not access the wavebank "
f"due to {e}")
return out
[docs] def get_wavebank_stream(self, bulk: List[tuple]) -> Stream:
""" processsafe get-waveforms-bulk call """
st = self._access_wavebank(
method="get_waveforms_bulk", timeout=120., bulk=bulk)
return st
def _handle_detections(
self,
new_party: Party,
trig_int: float,
hypocentral_separation: float,
endtime: UTCDateTime,
detect_directory: str,
save_waveforms: bool,
plot_detections: bool,
st: Stream,
) -> None:
"""
Handle new detections - do all the additional post-detection processing
Parameters
----------
new_party
The party of new detections
trig_int
Minimum inter-detection time in seconds.
hypocentral_separation
Maximum inter-event distance in km to consider detections as being
duplicates.
endtime
Last detection-time to be kept.
detect_directory
The head directory to write to - will create
"{detect_directory}/{year}/{julian day}" directories
save_waveforms
Whether to save the waveform for the detected event or not
plot_detections
Whether to plot the detection waveform or not
st
The stream the detection was made in - required for save_waveform
and plot_detection.
"""
for family in new_party:
if family is None:
continue
for d in family:
d._calculate_event(template=family.template)
self.party += family
Logger.info("Removing duplicate detections")
if len(self.party) > 0:
self.party.decluster(
trig_int=trig_int, timing="origin", metric="cor_sum",
hypocentral_separation=hypocentral_separation)
Logger.info("Completed decluster")
Logger.info("Writing detections to disk")
for family in self.party:
for detection in family:
if detection in self.detections:
continue
Logger.info(f"Writing detection: {detection.detect_time}")
self._fig = _write_detection(
detection=detection,
detect_directory=detect_directory,
save_waveform=save_waveforms,
plot_detection=plot_detections, stream=st,
fig=self._fig)
Logger.info("Expiring old detections")
for family in self.party:
Logger.debug(f"Checking for {family.template.name}")
family.detections = [
d for d in family.detections if d.detect_time >= endtime]
Logger.debug(f"Appending {len(family)} detections")
for detection in family:
# Need to append rather than create a new object
self.detections.append(detection)
return
def _plot(self) -> None: # pragma: no cover
""" Plot the data as it comes in. """
from rt_eqcorrscan.plotting.plot_buffer import EQcorrscanPlot
self._wait()
plot_options = copy.deepcopy(self.plot_options)
update_interval = plot_options.pop("update_interval", 100.)
plot_height = plot_options.pop("plot_height", 800)
plot_width = plot_options.pop("plot_width", 1500)
offline = plot_options.pop("offline", False)
self.plotter = EQcorrscanPlot(
rt_client=self.rt_client, plot_length=self.plot_length,
tribe=self, inventory=self.inventory,
detections=self.detections,
exclude_channels=self.plotting_exclude_channels,
update_interval=update_interval, plot_height=plot_height,
plot_width=plot_width, offline=offline,
**plot_options)
self.plotter.background_run()
def _wait(self, wait: float = None, detection_kwargs: dict = None) -> None:
""" Wait for `wait` seconds, or until all channels are available. """
Logger.info("Waiting for data.")
max_wait = min(self._max_wait_length, self.rt_client.buffer_capacity)
wait_length = 0.
while True:
tic = time.time()
# Check on backfillers
self._remove_unused_backfillers()
# Wait until we have some data
Logger.debug(
"Waiting for data, currently have {0} channels of {1} "
"expected channels".format(
len(self.rt_client.buffer_ids),
len(self.expected_seed_ids)))
if detection_kwargs:
self._add_templates_from_disk(**detection_kwargs)
else:
new_tribe = self._read_templates_from_disk()
if len(new_tribe) > 0:
self.templates.extend(new_tribe.templates)
time.sleep(self.sleep_step)
toc = time.time()
wait_length += (toc - tic)
if wait is None:
if len(self.rt_client.buffer_ids) >= len(self.expected_seed_ids):
break
if wait_length >= max_wait:
Logger.warning(
"Starting operation without the full dataset")
break
elif wait_length >= wait:
break
pass
return
def _start_streaming(self):
if not self.rt_client.started:
self.rt_client.start()
if self.rt_client.can_add_streams:
for tr_id in self.expected_seed_ids:
self.rt_client.select_stream(
net=tr_id.split('.')[0], station=tr_id.split('.')[1],
selector=tr_id.split('.')[3])
else:
Logger.warning("Client already in streaming mode,"
" cannot add channels")
if not self.rt_client.streaming:
self.rt_client.background_run()
Logger.info("Started real-time streaming")
else:
Logger.info("Real-time streaming already running")
def _runtime_check(self, run_start, max_run_length):
run_time = UTCDateTime.now() - run_start
Logger.info(
f"Run time: {run_time:.2f}s, max_run_length: {max_run_length:.2f}s")
if max_run_length and run_time > max_run_length:
Logger.critical("Hit maximum run time, stopping.")
self.stop()
return False
return True
[docs] def run(
self,
threshold: float,
threshold_type: str,
trig_int: float,
hypocentral_separation: float = None,
min_stations: int = None,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
max_run_length: float = None,
minimum_rate: float = None,
backfill_to: UTCDateTime = None,
backfill_client=None,
**kwargs
) -> Party:
"""
Run the RealTimeTribe detection.
Detections will be added to a party and returned when the detection
is done. Detections will be stored in memory for up to `keep_detections`
seconds. Detections will also be written to individual files in the
`detect_directory`.
Parameters
----------
threshold
Threshold for detection
threshold_type
Type of threshold to use. See
`eqcorrscan.core.match_filter.Tribe.detect` for options.
trig_int
Minimum inter-detection time in seconds.
hypocentral_separation
Maximum inter-event distance in km to consider detections as being
duplicates.
min_stations
Minimum number of stations required to make a detection.
keep_detections
Duration to store detection in memory for in seconds.
detect_directory
Relative path to directory for detections. This directory will be
created if it doesn't exist - tribe name will be appended to this
string to give the directory name.
plot_detections
Whether to plot detections or not - plots will be saved to the
`detect_directory` as png images.
save_waveforms
Whether to save waveforms of detections or not - waveforms
will be saved in the `detect_directory` as miniseed files.
max_run_length
Maximum detection run time in seconds. Default is to run
indefinitely.
minimum_rate
Stopping criteria: if the detection rate drops below this the
detector will stop. If set to None, then the detector will run
until `max_run_length`. Units: events per day
backfill_to
Time to backfill the data buffer to.
backfill_client
Client to use to backfill the data buffer.
Returns
-------
The party created - will not contain detections expired by
`keep_detections` threshold.
"""
restart_interval = 600.0
# Squash duplicate channels to avoid excessive channels
self.templates = [squash_duplicates(t) for t in self.templates]
# Reshape the templates first
if len(self.templates) > 0:
self.templates = reshape_templates(
templates=self.templates, used_seed_ids=self.expected_seed_ids)
else:
Logger.error("No templates, will not run")
return
# Remove templates that do not have enough stations in common with the
# inventory.
min_stations = min_stations or 0
n = len(self.templates)
self._ensure_templates_have_enough_stations(min_stations=min_stations)
n -= len(self.templates)
Logger.info(
f"{n} templates were removed because they did not share enough "
f"stations with the inventory. {len(self.templates)} will be used")
if len(self.templates) == 0:
Logger.critical("No templates remain, not running")
return Party()
# Fix unsupported args
try:
if kwargs.pop("plot"):
Logger.info("EQcorrscan plotting disabled")
except KeyError:
pass
_cores = kwargs.get("cores", None)
if _cores:
self.max_correlation_cores = min(
kwargs.pop("cores"), self.max_correlation_cores)
run_start = UTCDateTime.now()
detection_iteration = 0 # Counter for number of detection loops run
if not self.busy:
self.busy = True
detect_directory = detect_directory.format(name=self.name)
if not os.path.isdir(detect_directory):
os.makedirs(detect_directory)
# Get this locally before streaming starts
buffer_capacity = self.rt_client.buffer_capacity
# Start the streamer
self._start_streaming()
Logger.info("Detection will use the following data: {0}".format(
self.expected_seed_ids))
if backfill_client and backfill_to:
backfill = Stream()
self._wait()
_buffer = self.rt_client.stream
for tr_id in self.expected_seed_ids:
try:
tr_in_buffer = _buffer.select(id=tr_id)[0]
except IndexError:
continue
endtime = tr_in_buffer.stats.starttime
if endtime - backfill_to > buffer_capacity:
Logger.info("Truncating backfill to buffer length")
starttime = endtime - buffer_capacity
else:
starttime = backfill_to
try:
tr = backfill_client.get_waveforms(
*tr_id.split('.'), starttime=starttime,
endtime=endtime).merge()[0]
except Exception as e:
Logger.error("Could not back fill due to: {0}".format(e))
continue
Logger.debug("Downloaded backfill: {0}".format(tr))
backfill += tr
for tr in backfill:
# Get the lock!
Logger.debug(f"Adding {tr.id} to the buffer")
self.rt_client.on_data(tr)
Logger.debug("Stream in buffer is now: {0}".format(
self.rt_client.stream))
if self.plot: # pragma: no cover
# Set up plotting thread
self._plot()
Logger.info("Plotting thread started")
if not self.rt_client.buffer_length >= self.minimum_data_for_detection:
sleep_step = (
self.minimum_data_for_detection -
self.rt_client.buffer_length + 5) / self._speed_up
Logger.info("Sleeping for {0:.2f}s while accumulating data".format(
sleep_step))
self._wait(sleep_step)
first_data = min([tr.stats.starttime
for tr in self.rt_client.stream.merge()])
detection_kwargs = dict(
threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, hypocentral_separation=hypocentral_separation,
keep_detections=keep_detections, detect_directory=detect_directory,
plot_detections=plot_detections, save_waveforms=save_waveforms,
maximum_backfill=first_data, endtime=None,
min_stations=min_stations)
try:
while self.busy:
try:
self._running = True # Lock tribe
start_time = UTCDateTime.now()
st = self.rt_client.stream.split().merge()
if self.has_wavebank:
st = _check_stream_is_int(st)
try:
self._access_wavebank(
method="put_waveforms", timeout=120.,
stream=st)
except Exception as e:
Logger.error(
f"Could not write to wavebank due to {e}")
last_data_received = self.rt_client.last_data
# Split to remove trailing mask
if len(st) == 0:
Logger.warning("No data")
continue
Logger.info(
f"Streaming Client last received data at "
f"{self.rt_client.last_data}")
stream_end = max(tr.stats.endtime for tr in st)
min_stream_end = min(tr.stats.endtime for tr in st)
Logger.info(
f"Real-time client provided data: \n{st.__str__(extended=True)}")
# Cope with data that doesn't come
if start_time - last_data_received > restart_interval:
Logger.warning(
"The streaming client has not given any new data for "
f"{restart_interval} seconds. Restarting Streaming client")
Logger.info(f"start_time: {start_time}, last_data_received: {last_data_received}, stream_end: {stream_end}")
Logger.info("Stopping streamer")
self.rt_client.background_stop()
self.rt_client.stop()
# Get a clean instance just in case
Logger.info("Starting streamer")
self._start_streaming()
Logger.info("Streamer started")
st = self.rt_client.stream.split().merge() # Get data again.
Logger.info("Streaming client seems healthy")
# Remove any data that shouldn't be there - sometimes GeoNet's
# Seedlink client gives old data.
Logger.info(
f"Trimming between {stream_end - (buffer_capacity + 20.0)} "
f"and {stream_end}")
st.trim(
starttime=stream_end - (buffer_capacity + 20.0),
endtime=stream_end)
if detection_iteration > 0:
# For the first run we want to detect in everything we have.
# Otherwise trim so that all channels have at-least minimum data for detection
st.trim(
starttime=min_stream_end - self.minimum_data_for_detection,
endtime=stream_end)
Logger.info("Trimmed data")
if len(st) == 0:
Logger.warning("No data")
continue
# Remove short channels
st.traces = [
tr for tr in st
if _numpy_len(tr.data) >= (
.8 * self.minimum_data_for_detection)]
Logger.info("Starting detection run")
Logger.info("Using data: \n{0}".format(
st.__str__(extended=True)))
try:
Logger.debug("Currently have {0} templates in tribe".format(
len(self)))
new_party = self.detect(
stream=st, plot=False, threshold=threshold,
threshold_type=threshold_type, trig_int=trig_int,
xcorr_func="fftw", concurrency="concurrent",
cores=self.max_correlation_cores,
process_cores=self.process_cores,
parallel_process=self._parallel_processing,
ignore_bad_data=True, copy_data=False,
**kwargs)
Logger.info("Completed detection")
except Exception as e: # pragma: no cover
Logger.error(e)
Logger.error(traceback.format_exc())
if "Cannot allocate memory" in str(e):
Logger.error("Out of memory, stopping this detector")
self.stop()
break
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
Logger.info(
"Waiting for {0:.2f}s and hoping this gets "
"better".format(self.detect_interval))
time.sleep(self.detect_interval)
continue
Logger.info(f"Trying to get lock - lock status: {self.lock}")
with self.lock:
if len(new_party) > 0:
self._handle_detections(
new_party, trig_int=trig_int,
hypocentral_separation=hypocentral_separation,
endtime=stream_end - keep_detections,
detect_directory=detect_directory,
save_waveforms=save_waveforms,
plot_detections=plot_detections, st=st)
self._remove_old_detections(
stream_end - keep_detections)
Logger.info("Party now contains {0} detections".format(
len(self.detections)))
self._running = False # Release lock
run_time = UTCDateTime.now() - start_time
Logger.info("Detection took {0:.2f}s".format(run_time))
if self.detect_interval <= run_time:
Logger.warning(
"detect_interval {0:.2f} shorter than run-time "
"{1:.2f}, increasing detect_interval to {2:.2f}".format(
self.detect_interval, run_time, run_time + 10))
self.detect_interval = run_time + 10
Logger.info("Iteration {0} took {1:.2f}s total".format(
detection_iteration, run_time))
Logger.info("Waiting {0:.2f}s until next run".format(
self.detect_interval - run_time))
detection_iteration += 1
self._wait(
wait=(self.detect_interval - run_time) / self._speed_up,
detection_kwargs=detection_kwargs)
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length:
_rate = average_rate(
self.detections,
starttime=max(
stream_end - keep_detections, first_data),
endtime=stream_end)
if _rate < minimum_rate:
Logger.critical(
"Rate ({0:.2f}) has dropped below minimum rate, "
"stopping.".format(_rate))
self.stop()
break
gc.collect()
# Memory output
# sum1 = summary.summarize(muppy.get_objects())
# summary.print_(sum1)
except Exception as e:
Logger.critical(f"Uncaught error: {e}")
Logger.error(traceback.format_exc())
if not self._runtime_check(
run_start=run_start, max_run_length=max_run_length):
break
finally:
Logger.critical("Stopping")
self.stop()
return self.party
def _read_templates_from_disk(self):
template_files = glob.glob(f"{self._template_dir}/*")
if len(template_files) == 0:
return []
Logger.info(f"Checking for events in {self._template_dir}")
new_tribe = Tribe()
for template_file in template_files:
Logger.debug(f"Reading from {template_file}")
try:
new_tribe += Template().read(template_file)
except Exception as e:
Logger.error(f"Could not read {template_file} due to {e}")
os.remove(template_file) # Remove file once done with it.
new_tribe.templates = [t for t in new_tribe
if t.name not in self.running_templates]
Logger.info(f"Read in {len(new_tribe)} new templates from disk")
return new_tribe
def _add_templates_from_disk(
self,
threshold: float,
threshold_type: str,
trig_int: float,
min_stations: int,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
maximum_backfill: Union[float, UTCDateTime] = None,
endtime: UTCDateTime = None,
**kwargs
):
new_tribe = self._read_templates_from_disk()
if len(new_tribe) == 0:
return
Logger.info(
f"Adding {len(new_tribe)} templates to already running tribe.")
self.add_templates(
new_tribe, threshold=threshold, threshold_type=threshold_type,
trig_int=trig_int, min_stations=min_stations,
keep_detections=keep_detections, detect_directory=detect_directory,
plot_detections=plot_detections, save_waveforms=save_waveforms,
maximum_backfill=maximum_backfill, endtime=endtime, **kwargs)
[docs] def add_templates(
self,
templates: Union[List[Template], Tribe],
threshold: float,
threshold_type: str,
trig_int: float,
min_stations: int = None,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
maximum_backfill: Union[float, UTCDateTime] = None,
endtime: UTCDateTime = None,
**kwargs
) -> set:
"""
Add templates to the tribe.
This method will run the new templates back in time, then append the
templates to the already running tribe.
# TODO - make standard parameters and wrap methods to decorate them
Parameters
----------
templates
New templates to add to the tribe.
threshold
Threshold for detection
threshold_type
Type of threshold to use. See
`eqcorrscan.core.match_filter.Tribe.detect` for options.
trig_int
Minimum inter-detection time in seconds.
min_stations:
Minimum number of stations required to make a detection.
keep_detections
Duration to store detection in memory for in seconds.
detect_directory
Relative path to directory for detections. This directory will be
created if it doesn't exist - tribe name will be appended to this
string to give the directory name.
plot_detections
Whether to plot detections or not - plots will be saved to the
`detect_directory` as png images.
save_waveforms
Whether to save waveforms of detections or not - waveforms
will be saved in the `detect_directory` as miniseed files.
maximum_backfill
Time in seconds to backfill to - if this is larger than the
difference between the time now and the time that the tribe
started, then it will backfill to when the tribe started.
endtime
Time to stop the backfill, if None will run to now.
Returns
-------
Complete set of template names after addition
"""
while self._running:
Logger.info("Waiting for access to tribe in add_templates")
time.sleep(1) # Wait until the lock is released
if isinstance(templates, list):
new_tribe = Tribe(templates)
else:
new_tribe = templates
# Squash duplicate channels to avoid excessive channels
new_tribe.templates = [squash_duplicates(t) for t in new_tribe.templates]
# Reshape
new_tribe.templates = reshape_templates(
templates=new_tribe.templates, used_seed_ids=self.expected_seed_ids)
# Remove templates that do not have enough stations.
min_stations = min_stations or 0
n = len(new_tribe)
new_tribe.templates = [
t for t in new_tribe.templates
if len({tr.stats.station for tr in t.st}.intersection(
self.used_stations)) >= min_stations]
n -= len(new_tribe)
Logger.info(
f"{n} templates were removed because they did not have enough "
f"stations. {len(new_tribe)} will be added to the running tribe.")
self.templates.extend(new_tribe.templates)
self.backfill(
templates=new_tribe, threshold=threshold,
threshold_type=threshold_type, trig_int=trig_int,
keep_detections=keep_detections, detect_directory=detect_directory,
plot_detections=plot_detections, save_waveforms=save_waveforms,
maximum_backfill=maximum_backfill, endtime=endtime, **kwargs)
return set(t.name for t in self.templates)
[docs] def backfill(
self,
templates: Union[List[Template], Tribe],
threshold: float,
threshold_type: str,
trig_int: float,
hypocentral_separation: float = None,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
maximum_backfill: Union[float, UTCDateTime] = None,
endtime: UTCDateTime = None,
**kwargs
) -> None:
"""
Backfill using data from rt_client's wavebank.
This method will run the new templates through old data and record
detections in the real-time-tribe.
Parameters
----------
templates
New templates to add to the tribe.
threshold
Threshold for detection
threshold_type
Type of threshold to use. See
`eqcorrscan.core.match_filter.Tribe.detect` for options.
trig_int
Minimum inter-detection time in seconds.
hypocentral_separation
Maximum inter-event distance in km to consider detections as being
duplicates.
keep_detections
Duration to store detection in memory for in seconds.
detect_directory
Relative path to directory for detections. This directory will be
created if it doesn't exist - tribe name will be appended to this
string to give the directory name.
plot_detections
Whether to plot detections or not - plots will be saved to the
`detect_directory` as png images.
save_waveforms
Whether to save waveforms of detections or not - waveforms
will be saved in the `detect_directory` as miniseed files.
maximum_backfill
Time in seconds to backfill to - if this is larger than the
difference between the time now and the time that the tribe
started, then it will backfill to when the tribe started.
endtime
Time to stop the backfill, if None will run to now.
"""
# Get the stream - Only let the main process get the Stream
Logger.info("Acquiring stream from wavebank")
endtime = endtime or UTCDateTime.now()
if maximum_backfill is not None:
if isinstance(maximum_backfill, (float, int)):
starttime = endtime - maximum_backfill
elif isinstance(maximum_backfill, UTCDateTime):
starttime = maximum_backfill
else:
Logger.warning(
f"maximum_backfill is {type(maximum_backfill)}, not float "
"or UTCDateTime, starting from 0")
starttime = UTCDateTime(0)
else:
starttime = UTCDateTime(0)
Logger.info(f"Backfilling between {starttime} and {endtime}")
if starttime >= endtime or not self.has_wavebank:
return
if self.expected_seed_ids and len(self.expected_seed_ids) > 0:
bulk = []
for chan in self.expected_seed_ids:
query = chan.split('.')
query.extend([starttime, endtime])
bulk.append(tuple(query))
else:
Logger.warning("No expected seed ids")
return
if len(bulk) == 0:
Logger.warning("No bulk")
return
Logger.info(f"Getting stations for backfill: {bulk}")
st = self.get_wavebank_stream(bulk)
self._number_of_backfillers += 1
backfill_process = Process(
target=self._backfill,
args=(templates, st, threshold, threshold_type, trig_int,
hypocentral_separation, keep_detections, detect_directory,
plot_detections, save_waveforms, endtime),
kwargs=kwargs, name=f"Backfiller_{self._number_of_backfillers}")
backfill_process.start()
self._backfillers.append(backfill_process)
Logger.info("Backfill process started, returning")
return
def _backfill(
self,
templates: Union[List[Template], Tribe],
st: Stream,
threshold: float,
threshold_type: str,
trig_int: float,
hypocentral_separation: float = None,
keep_detections: float = 86400,
detect_directory: str = "{name}/detections",
plot_detections: bool = True,
save_waveforms: bool = True,
endtime: UTCDateTime = None,
**kwargs
) -> None:
""" Background backfill method """
if isinstance(templates, Tribe):
new_tribe = templates
else:
new_tribe = Tribe(templates)
# Squash duplicate channels to avoid excessive channels
new_tribe.templates = [squash_duplicates(t) for t in new_tribe.templates]
# Reshape
new_tribe.templates = reshape_templates(
templates=new_tribe.templates, used_seed_ids=self.expected_seed_ids)
Logger.info(f"Backfilling with {len(new_tribe)} templates")
Logger.debug("Additional templates to be run: \n{0} "
"templates".format(len(new_tribe)))
Logger.info("Starting backfill detection run with:")
Logger.info(st.__str__(extended=True))
# Break into chunks so that detections can be handled as they happen
starttime, endtime = (
min(tr.stats.starttime for tr in st),
max(tr.stats.endtime for tr in st))
# Send off twice the minimum data to allow for overlaps.
_starttime, _endtime = (
starttime, starttime + 2 * self.minimum_data_for_detection)
while _endtime < endtime + self.minimum_data_for_detection:
st_chunk = st.slice(_starttime, _endtime)
try:
new_party = new_tribe.detect(
stream=st_chunk, plot=False, threshold=threshold,
threshold_type=threshold_type, trig_int=trig_int,
xcorr_func="fftw", concurrency="concurrent",
cores=self.max_correlation_cores,
parallel_process=self._parallel_processing,
process_cores=self.process_cores, copy_data=False,
**kwargs)
except Exception as e:
Logger.error(e)
_starttime += self.minimum_data_for_detection
_endtime += self.minimum_data_for_detection
continue
detect_directory = detect_directory.format(name=self.name)
Logger.info(
f"Backfill detection between {_starttime} and {_endtime} "
f"completed - handling detections")
if len(new_party) > 0:
Logger.info(f"Lock status: {self.lock}")
with self.lock: # The only time the state is altered
Logger.info(f"Lock status: {self.lock}")
self._handle_detections(
new_party=new_party, detect_directory=detect_directory,
endtime=endtime - keep_detections,
plot_detections=plot_detections,
save_waveforms=save_waveforms, st=st, trig_int=trig_int,
hypocentral_separation=hypocentral_separation)
_starttime += self.minimum_data_for_detection
_endtime += self.minimum_data_for_detection
return
[docs] def stop(self, write_stopfile: bool = False) -> None:
"""
Stop the real-time system.
Parameters
----------
write_stopfile:
Used to write a one-line file telling listening systems that
this has stopped. Used by the Reactor.
"""
if self.plotter is not None: # pragma: no cover
self.plotter.background_stop()
self.rt_client.background_stop()
self.busy = False
self._running = False
if self._detecting_thread is not None:
self._detecting_thread.join()
if write_stopfile:
with open(".stopfile", "a") as f:
f.write(f"{self.name}\n")
[docs]def reshape_templates(
templates: List[Template],
used_seed_ids: Iterable[str]
) -> List[Template]:
"""
Reshape templates to have the full set of required channels (and no more).
This is done within matched-filter as well as here - we do it here so that
the templates are not reshaped every iteration.
Parameters
----------
templates
Templates to be reshaped - the templates are changed in-place
used_seed_ids
Seed-ids used for detection (network.station.location.channel)
Returns
-------
Templates filled for detection.
"""
template_streams = [t.st for t in templates]
template_names = [t.name for t in templates]
samp_rate = template_streams[0][0].stats.sampling_rate
process_len = max(t.process_length for t in templates)
# Make a dummy stream with all the used seed ids
stream = Stream()
for seed_id in used_seed_ids:
net, sta, loc, chan = seed_id.split('.')
tr = Trace(header=dict(
network=net, station=sta, location=loc, channel=chan,
sampling_rate=samp_rate),
data=numpy.empty(int(process_len * samp_rate)))
stream += tr
_, template_streams, template_names = _prep_data_for_correlation(
stream=stream, templates=template_streams,
template_names=template_names, force_stream_epoch=False)
templates_back = []
for template_st, template_name in zip(template_streams, template_names):
original = [t for t in templates if t.name == template_name]
assert len(original) == 1
original = original[0]
original.st = template_st
templates_back.append(original)
return templates_back
[docs]def squash_duplicates(template: Template):
"""
Remove duplicate channels in templates.
This happens when there are duplicate picks, and it fucks shit up.
More explicitly (less?): when there are duplicate picks, duplicate
channels appear in the template, which are then extended to all
templates meaning that many copies of a template are run, resulting
in excessive detection bias on one channel, and very expensive templates
without good reason.
"""
from collections import Counter
# Check if there are duplicate channels first
seed_ids = Counter(tr.id for tr in template.st)
if seed_ids.most_common(1)[0][1] == 1:
# Do nothing, no duplicates
return template
unique_template_st = Stream()
unique_event_picks = []
for seed_id, repeats in seed_ids.most_common():
seed_id_picks = [p for p in template.event.picks
if p.waveform_id.get_seed_string() == seed_id
and p.phase_hint[0] in "PS"]
if repeats == 1:
unique_template_st += template.st.select(id=seed_id)
unique_event_picks.append(seed_id_picks[0])
continue
# Now we get to doing something - get the stream and the picks
repeat_stream = template.st.select(id=seed_id)
# Get the unique start-times
unique_traces = {tr.stats.starttime.datetime: tr
for tr in repeat_stream.traces}
if len(unique_traces) == 1:
unique_template_st += unique_traces.popitem()[1]
unique_event_picks.append(seed_id_picks[0])
continue
# If there are more than one unique start-time then we need to
# find the appropriate pick for each
unique_picks = {f"{p.phase_hint}_{p.time}": p for p in seed_id_picks}
for pick in unique_picks.values():
expected_starttime = pick.time - template.prepick
tr = unique_traces.get(expected_starttime.datetime, None)
if tr:
unique_event_picks.append(pick)
unique_template_st += tr
else:
Logger.debug(f"No trace for pick at {pick.time}")
template.st = unique_template_st
template.event.picks = unique_event_picks
return template
def _write_detection(
detection: Detection,
detect_directory: str,
save_waveform: bool,
plot_detection: bool,
stream: Stream,
fig=None,
) -> Figure:
"""
Handle detection writing including writing streams and figures.
Parameters
----------
detection
The Detection to write
detect_directory
The head directory to write to - will create
"{detect_directory}/{year}/{julian day}" directories
save_waveform
Whether to save the waveform for the detected event or not
plot_detection
Whether to plot the detection waveform or not
stream
The stream the detection was made in - required for save_waveform and
plot_detection.
fig
A figure object to reuse.
Returns
-------
An empty figure object to be reused if a figure was created, or the figure
passed to it.
"""
from rt_eqcorrscan.plotting.plot_event import plot_event
_path = os.path.join(
detect_directory, detection.detect_time.strftime("%Y"),
detection.detect_time.strftime("%j"))
if not os.path.isdir(_path):
os.makedirs(_path)
_filename = os.path.join(
_path, detection.detect_time.strftime("%Y%m%dT%H%M%S"))
try:
detection.event.write(f"{_filename}.xml", format="QUAKEML")
except Exception as e:
Logger.error(f"Could not write event file due to {e}")
detection.event.picks.sort(key=lambda p: p.time)
st = stream.slice(
detection.event.picks[0].time - 10,
detection.event.picks[-1].time + 20).copy()
if plot_detection:
# Make plot
fig = plot_event(fig=fig, event=detection.event, st=st,
length=90, show=False)
try:
fig.savefig(f"{_filename}.png")
except Exception as e:
Logger.error(f"Could not write plot due to {e}")
fig.clf()
if save_waveform:
st = _check_stream_is_int(st)
try:
st.write(f"{_filename}.ms", format="MSEED")
except Exception as e:
Logger.error(f"Could not write stream due to {e}")
return fig
def _check_stream_is_int(st):
st = st.split()
for tr in st:
# Ensure data are int32, see https://github.com/obspy/obspy/issues/2683
if tr.data.dtype == numpy.int32 and \
tr.data.dtype.type != numpy.int32:
tr.data = tr.data.astype(numpy.int32, subok=False)
if tr.data.dtype.type == numpy.intc:
tr.data = tr.data.astype(numpy.int32, subok=False)
return st
def _numpy_len(arr: Union[numpy.ndarray, numpy.ma.MaskedArray]) -> int:
"""
Convenience function to return the length of a numpy array.
If arr is a masked array will return the count of the non-masked elements.
Parameters
----------
arr
Array to get the length of - must be 1D
Returns
-------
Length of non-masked elements
"""
assert arr.ndim == 1, "Only supports 1D arrays."
if numpy.ma.is_masked(arr):
return arr.count()
return arr.shape[0]
if __name__ == "__main__":
import doctest
doctest.testmod()