Source code for rt_eqcorrscan.reactor.spin_up

#!/usr/bin/env python
"""
Functions for spinning up and running a real-time tribe based on trigger-events
"""

import os
import logging

from collections import Counter
from typing import List, Union

from obspy import read_events, Inventory, UTCDateTime, Stream
from obspy.core.event import Event
from obspy.clients.fdsn.client import FDSNNoDataException
from obspy.geodetics import locations2degrees, kilometer2degrees

from obsplus import WaveBank

from eqcorrscan import Tribe

from rt_eqcorrscan import read_config, RealTimeTribe
from rt_eqcorrscan.database.database_manager import check_tribe_quality
from rt_eqcorrscan.event_trigger.listener import event_time


Logger = logging.getLogger(__file__)


def _read_event_list(fname: str) -> List[str]:
    with open(fname, "r") as f:
        event_ids = [line.strip() for line in f]
    return event_ids


def run(
    working_dir: str,
    cores: int = 1,
    log_to_screen: bool = False,
    speed_up: float = 1.0,
    synthetic_time_offset: float = 0,
):
    os.chdir(working_dir)
    Logger.debug("Reading config")
    config = read_config('rt_eqcorrscan_config.yml')
    config.setup_logging(
        screen=log_to_screen, file=True,
        filename="{0}/rt_eqcorrscan_{1}.log".format(
            working_dir,
            os.path.split(working_dir)[-1]))
    # Enforce a local-wave-bank for the spun-up real-time instance
    config.rt_match_filter.local_wave_bank = "streaming_wavebank"

    triggering_event = read_events('triggering_event.xml')[0]
    Logger.debug(f"Triggered by {triggering_event}")
    min_stations = config.rt_match_filter.get("min_stations", None)
    Logger.info("Reading the Tribe")
    tribe = Tribe().read("tribe.tgz")
    # Remove file to avoid re-reading it
    os.remove("tribe.tgz")

    Logger.info("Read in {0} templates".format(len(tribe)))
    if len(tribe) == 0:
        Logger.warning("No appropriate templates found")
        return
    Logger.info("Checking tribe quality: removing templates with "
                "fewer than {0} stations".format(min_stations))
    tribe = check_tribe_quality(
        tribe, min_stations=min_stations, **config.template)
    Logger.info("Tribe now contains {0} templates".format(len(tribe)))
    if len(tribe) == 0:
        return None, None

    client = config.rt_match_filter.get_client()
    rt_client = config.streaming.get_streaming_client()

    inventory = get_inventory(
        client, tribe, triggering_event=triggering_event,
        max_distance=config.rt_match_filter.max_distance,
        n_stations=config.rt_match_filter.n_stations)
    if len(inventory) == 0:
        Logger.critical(
            f"No inventory within {config.rt_match_filter.max_distance}"
            f"km of the trigger matching your templates, not running")
        return None, None
    detect_interval = config.rt_match_filter.get(
        "detect_interval", 60)
    plot = config.rt_match_filter.get("plot", False)
    real_time_tribe = RealTimeTribe(
        tribe=tribe, inventory=inventory, rt_client=rt_client,
        detect_interval=detect_interval, plot=plot,
        plot_options=config.plot,
        backfill_interval=config.rt_match_filter.backfill_interval,
        name=triggering_event.resource_id.id.split('/')[-1],
        wavebank=config.rt_match_filter.local_wave_bank,
        notifer=config.notifier)
    real_time_tribe._speed_up = speed_up
    if speed_up > 1:
        Logger.warning(f"Speed-up of {speed_up}: disallowing spoilers.")
        real_time_tribe._spoilers = False
    if real_time_tribe.expected_seed_ids is None:
        Logger.error("No matching channels in inventory and templates")
        return

    # Disable parallel processing for subprocess
    real_time_tribe._parallel_processing = False
    # Set the maximum correlation core-count
    if config.rt_match_filter.get("max_correlation_cores", None):
        cores = min(cores, config.rt_match_filter.max_correlation_cores)
    real_time_tribe.max_correlation_cores = cores

    Logger.info("Created real-time tribe with inventory:\n{0}".format(
        inventory))

    # TODO: How will this work? Currently notifiers are not implemented
    # real_time_tribe.notifier = None

    backfill_to = event_time(triggering_event) - config.template.process_len
    backfill_client = config.rt_match_filter.get_waveform_client()

    if backfill_client and real_time_tribe.wavebank:
        # Download the required data and write it to disk.
        endtime = UTCDateTime.now() - synthetic_time_offset  # Adjust for simulations
        Logger.info(
            f"Backfilling between {backfill_to} and {endtime}")
        st = Stream()
        for network in inventory:
            for station in network:
                for channel in station:
                    Logger.info(
                        f"Downloading for {network.code}.{station.code}."
                        f"{channel.location_code}.{channel.code}")
                    try:
                        st += backfill_client.get_waveforms(
                            network=network.code, station=station.code,
                            location=channel.location_code,
                            channel=channel.code,
                            starttime=backfill_to,
                            endtime=endtime)
                    except Exception as e:
                        Logger.error(e)
                        continue
        st = st.merge()
        Logger.info(f"Downloaded {len(st)} for backfill")
        if len(st) == 0:
            Logger.warning("No backfill available, skipping")
        else:
            st = st.split()  # Cannot write masked data
            real_time_tribe.wavebank.put_waveforms(st)
        backfill_stations = {tr.stats.station for tr in st}
        backfill_templates = [
            t for t in real_time_tribe.templates
            if len({tr.stats.station for tr in t.st}.intersection(
                backfill_stations)) >= min_stations]
        Logger.info("Computing backfill detections")
        real_time_tribe.backfill(
            templates=backfill_templates,
            threshold=config.rt_match_filter.threshold,
            threshold_type=config.rt_match_filter.threshold_type,
            trig_int=config.rt_match_filter.trig_int)

    Logger.info("Starting real-time run")
    
    real_time_tribe.run(
        threshold=config.rt_match_filter.threshold,
        threshold_type=config.rt_match_filter.threshold_type,
        trig_int=config.rt_match_filter.trig_int,
        hypocentral_separation=config.rt_match_filter.hypocentral_separation,
        min_stations=min_stations,
        keep_detections=config.rt_match_filter.keep_detections,
        detect_directory="{name}/detections",
        plot_detections=config.rt_match_filter.plot_detections,
        save_waveforms=config.rt_match_filter.save_waveforms,
        max_run_length=config.rt_match_filter.max_run_length,
        minimum_rate=config.rt_match_filter.minimum_rate,
        backfill_to=backfill_to,
        backfill_client=backfill_client)


[docs]def get_inventory( client, tribe: Union[RealTimeTribe, Tribe], triggering_event: Event = None, location: dict = None, starttime: UTCDateTime = None, max_distance: float = 1000., n_stations: int = 10, duration: float = 10, level: str = "channel", channel_list: Union[list, tuple] = ("EH?", "HH?"), ) -> Inventory: """ Get a suitable inventory for a tribe - selects the most used, closest stations. Parameters ---------- client: Obspy client with a get_stations service. tribe: Tribe or RealTimeTribe of templates to query for stations. triggering_event: Event with at least an origin to calculate distances from - if not specified will use `location` location: Dictionary with "latitude" and "longitude" keys - only used if `triggering event` is not specified. starttime: Start-time for station search - only used if `triggering_event` is not specified. max_distance: Maximum distance from `triggering_event.preferred_origin` or `location` to find stations. Units: km n_stations: Maximum number of stations to return duration: Duration stations must be active for. Units: days level: Level for inventory parsable by `client.get_stations`. channel_list List of channel-codes to be acquired. If `None` then all channels will be searched. Returns ------- Inventory of the most used, closest stations. """ inv = Inventory(networks=[], source=None) if triggering_event is not None: try: origin = ( triggering_event.preferred_origin() or triggering_event.origins[0]) except IndexError: Logger.error("Triggering event has no origin") return inv lat = origin.latitude lon = origin.longitude _starttime = origin.time else: lat = location["latitude"] lon = location["longitude"] _starttime = starttime for channel_str in channel_list or ["*"]: try: inv += client.get_stations( startbefore=_starttime, endafter=_starttime + (duration * 86400), channel=channel_str, latitude=lat, longitude=lon, maxradius=kilometer2degrees(max_distance), level=level) except FDSNNoDataException: continue if len(inv) == 0: return inv # Calculate distances station_count = Counter( [pick.waveform_id.station_code for template in tribe for pick in template.event.picks]) sta_dist = [] for net in inv: for sta in net: dist = locations2degrees( lat1=lat, long1=lon, lat2=sta.latitude, long2=sta.longitude) sta_dist.append((sta.code, dist, station_count[sta.code])) sta_dist.sort(key=lambda _: (-_[2], _[1])) inv_out = inv.select(station=sta_dist[0][0]) for sta in sta_dist[1:n_stations]: inv_out += inv.select(station=sta[0]) return inv_out
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Script to spin-up a real-time matched-filter instance") parser.add_argument( "-w", "--working-dir", type=str, help="Working directory containing configuration file, list of " "templates and place to store temporary files") parser.add_argument( "-n", "--n-processors", type=int, default=1, help="Number of processors to use for detection") parser.add_argument( "-l", "--log-to-screen", action="store_true", help="Whether to log to screen or not, defaults to False") parser.add_argument( "-s", "--speed-up", type=float, help="Speed-up multiplier for synthetic runs") parser.add_argument( "-o", "--offset", type=float, default=0.0, help="Synthetic time offset from now in seconds - used for " "simulation") args = parser.parse_args() run(working_dir=args.working_dir, cores=args.n_processors, log_to_screen=args.log_to_screen, speed_up=args.speed_up, synthetic_time_offset=args.offset)