Source code for rt_eqcorrscan.database.database_manager

"""
Tools for managing a database of template information.
"""

import logging
import os
import threading

from pathlib import Path
from collections import Counter

from typing import Optional, Union, Callable, Iterable
from concurrent.futures import Executor
from functools import partial

from obsplus import EventBank
from obsplus.constants import (
    EVENT_NAME_STRUCTURE, EVENT_PATH_STRUCTURE, get_events_parameters)
from obsplus.utils.docs import compose_docstring
from obsplus.utils.bank import _get_path, _get_time_values
from obsplus.utils.events import _summarize_event
from obsplus.utils.time import _get_event_origin_time

from obspy import Catalog, Stream
from obspy.core.event import Event

from eqcorrscan.core.match_filter import Tribe, read_template, Template

Logger = logging.getLogger(__name__)

TEMPLATE_EXT = ".tgz"


def _lazy_template_read(path) -> Template:
    """
    Handle exceptions in template reading.

    Parameters
    ----------
    path
        Path to template file

    Returns
    -------
    Template.
    """
    if not os.path.isfile(path):
        Logger.debug("{0} does not exist".format(path))
        return None
    try:
        return read_template(path)
    except Exception as e:
        Logger.error(e)
        return None


def _summarize_template(
    template: Template = None,
    event: Event = None,
    path: Optional[str] = None,
    name: Optional[str] = None,
    path_struct: Optional[str] = None,
    name_struct: Optional[str] = None,
) -> dict:
    """
    Function to extract info from templates for indexing.

    Parameters
    ----------
    template
        The template object
    event
        The template event, give either this or the template.
    path
        Other Parameters to the file
    name
        Name of the file
    path_struct
        directory structure to create
    name_struct
    """
    assert template or event
    if template:
        event = template.event
    res_id = str(event.resource_id)
    out = {
        "ext": TEMPLATE_EXT, "event_id": res_id, "event_id_short": res_id[-5:],
        "event_id_end": res_id.split('/')[-1]}
    t1 = _get_event_origin_time(event)
    out.update(_get_time_values(t1))
    path_struct = path_struct or EVENT_PATH_STRUCTURE
    name_struct = name_struct or EVENT_NAME_STRUCTURE

    out.update(_get_path(out, path, name, path_struct, name_struct))
    return out


class _Result(object):
    """
     Thin imitation of concurrent.futures.Future

     .. rubric:: example
     >>> result = _Result(12)
     >>> print(result)
     _Result(12)
     >>> print(result.result())
     12
     """
    def __init__(self, result):
        self._result = result

    def __repr__(self):
        return "_Result({0})".format(self._result)

    def result(self):
        return self._result


class _SerialExecutor(Executor):
    """
    Simple interface to mirror concurrent.futures.Executor in serial.

    .. rubric:: Example
    >>> with _SerialExecutor() as executor:
    ...     result = executor.submit(pow, 2, 12)
    >>> print(result.result())
    4096
    >>> def square(a):
    ...     return a * a
    >>> with _SerialExecutor() as executor:
    ...     futures = executor.map(square, range(5))
    >>> results = list(futures)
    >>> print(results)
    [0, 1, 4, 9, 16]
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def map(
        self,
        fn: Callable,
        *iterables: Iterable,
        timeout=None,
        chunksize=1,
    ):
        """
        Map iterables to a function

        Parameters
        ----------
        fn
            callable
        iterables
            iterable of arguments for `fn`
        timeout
            Throw-away variable
        chunksize
            Throw-away variable

        Returns
        -------
        The result of mapping `fn` across `*iterables`
        """
        return map(fn, *iterables)

    def submit(self, fn: Callable, *args, **kwargs) -> _Result:
        """
        Run a single function.

        Parameters
        ----------
        fn
            Function to apply
        args
            Arguments for `fn`
        kwargs
            Key word arguments for `fn`

        Returns
        -------
        result with the result in the .result attribute
        """
        return _Result(fn(*args, **kwargs))


[docs]class TemplateBank(EventBank): # TODO: This should probably be it's own _Bank subclass with extra columns including if template exists and template channels - would allow for faster filtering """ A database manager for templates. Based on obsplus EventBank. Parameters ---------- base_path The path to the directory containing event files. If it does not exist an empty directory will be created. path_structure Define the directory structure used by the event bank. Characters are separated by /, regardless of operating system. The following words can be used in curly braces as data specific variables: year, month, day, julday, hour, minute, second, event_id, event_id_short. If no structure is provided it will be read from the index, if no index exists the default is {year}/{month}/{day} name_structure : str The same as path structure but for the event, template and waveform file names. Supports the same variables and a slash cannot be used in a file name on most operating systems. The default extension (.xml, .tgz, .ms) will be added for events, templates and waveforms respectively. The default is {time}_{event_id_short}. event_format The anticipated format of the event files. Any format supported by the obspy.read_events function is permitted. event_ext The extension on the event files. Can be used to avoid parsing non-event files. template_ext The extension on the template files. Can be used to avoid parsing non-template files. Notes ----- Supports parallel execution of most methods using `concurrent.futures` executors. Set the `TemplateBank.executor` attribute to your required executor. """ index_lock = threading.Lock() # Lock access to index def __init__( self, base_path: Union[str, Path, "EventBank"] = ".", path_structure: Optional[str] = None, name_structure: Optional[str] = None, event_format="quakeml", event_ext=".xml", template_ext=".tgz", executor=None ): """Initialize the bank.""" super().__init__( base_path=base_path, path_structure=path_structure, name_structure=name_structure, format=event_format, ext=event_ext) self.template_ext = template_ext self.executor = executor or _SerialExecutor()
[docs] @compose_docstring(get_events_params=get_events_parameters) def get_templates(self, **kwargs) -> Tribe: """ Get template waveforms from the database Supports passing an `concurrent.futures.Executor` using the `executor` keyword argument for parallel reading. {get_event_params} """ with self.index_lock: paths = str(self.bank_path) + os.sep + self.read_index( columns=["path", "latitude", "longitude"], **kwargs).path paths = [path.replace(self.ext, self.template_ext) for path in paths] future = self.executor.map(_lazy_template_read, paths) return Tribe([t for t in future if t is not None])
[docs] def put_templates( self, templates: Union[list, Tribe], update_index: bool = True, write_events: bool = True, ) -> None: """ Save templates to the database. Parameters ---------- templates Templates to put into the database update_index Flag to indicate whether or not to update the event index after writing the new events. write_events Optionally write out the event file as well as the template. """ for t in templates: assert(isinstance(t, Template)) if write_events: catalog = Catalog([t.event for t in templates]) with self.index_lock: self.put_events(catalog, update_index=update_index) inner_put_template = partial( _put_template, path_structure=self.path_structure, template_name_structure=self.name_structure, bank_path=self.bank_path) with self.index_lock: _ = [_ for _ in self.executor.map(inner_put_template, templates)]
[docs] def make_templates( self, catalog: Catalog, stream: Stream = None, client=None, download_data_len: float = 90, save_raw: bool = True, update_index: bool = True, rebuild: bool = False, **kwargs, ) -> Tribe: """ Make templates from data or client based on a given catalog. Templates will be put in the database. Requires either a stream or a suitable client with a get_waveforms method. Parameters ---------- catalog Catalog of events to generate templates from. stream Optional: Stream encompassing the events in the catalog client Optional: Client with at-least a `get_waveforms` method, ideally the client should make the data for the events in catalog available. download_data_len If client is given this is the length of data to download. The raw continuous data will also be saved to disk to allow later processing if save_raw=True save_raw Whether to store raw data on disk as well - defaults to True. update_index Flag to indicate whether or not to update the event index after writing the new events. rebuild Flag to indicate whether templates already existing in the TemplateBank should be re-generated. `True` will overwrite existing templates. kwargs Keyword arguments supported by EQcorrscan's `Template.construct` method. Requires at least: lowcut, highcut, samp_rate, filt_order, prepick, length, swin Returns ------- Tribe of templates """ assert client or stream, "Needs either client or stream" if stream is not None: tribe = Tribe().construct( method="from_metafile", meta_file=catalog, stream=stream, **kwargs) else: Logger.debug("Making templates") inner_download_and_make_template = partial( _download_and_make_template, client=client, download_data_len=download_data_len, path_structure=self.path_structure, bank_path=self.bank_path, template_name_structure=self.name_structure, save_raw=save_raw, rebuild=rebuild, **kwargs) template_iterable = self.executor.map( inner_download_and_make_template, catalog) tribe = Tribe([t for t in template_iterable if t is not None]) Logger.info(f"Putting {len(tribe)} templates into database") self.put_templates(tribe, update_index=update_index) Logger.info("Finished putting templates into database.") return tribe
def _put_template( template: Template, path_structure: str, template_name_structure: str, bank_path: str, ) -> str: """ Get path for template and write it. """ path = _summarize_template( template=template, path_struct=path_structure, name_struct=template_name_structure)["path"] ppath = (Path(bank_path) / path).absolute() ppath.parent.mkdir(parents=True, exist_ok=True) output_path = str(ppath) template.write(output_path) # Issue with older EQcorrscan doubling up extension if os.path.isfile(output_path + TEMPLATE_EXT): os.rename(output_path + TEMPLATE_EXT, output_path) return output_path def _download_and_make_template( event: Event, client, download_data_len: float, path_structure: str, bank_path: str, template_name_structure: str, save_raw: bool, rebuild: bool, **kwargs, ) -> Template: """ Make the template using downloaded data""" Logger.debug("Making template for event {0}".format(event.resource_id)) if not rebuild: try: path = _summarize_template( event=event, path_struct=path_structure, name_struct=template_name_structure)["path"] except ValueError as e: Logger.error(f"Could not summarize event due to {e}") return None ppath = (Path(bank_path) / path).absolute() ppath.parent.mkdir(parents=True, exist_ok=True) output_path = str(ppath) if os.path.isfile(output_path): Logger.debug("Template exists and rebuild=False, skipping") return read_template(output_path) # Sanitize event - sometime Arrivals or not linked. pick_dict = {p.resource_id.id: p for p in event.picks} for origin in event.origins: origin.arrivals = [ arr for arr in origin.arrivals if arr.pick_id in pick_dict] _process_len = kwargs.pop("process_len", download_data_len) if _process_len > download_data_len: Logger.info( "Downloading {0}s of data as required by process len".format( _process_len)) download_data_len = _process_len st = _get_data_for_event( event=event, client=client, download_data_len=download_data_len, path_structure=path_structure, bank_path=bank_path, template_name_structure=template_name_structure, save_raw=save_raw) if st is None: return None Logger.debug("Downloaded {0} traces for event {1}".format( len(st), event.resource_id)) try: tribe = Tribe().construct( method="from_meta_file", meta_file=Catalog([event]), st=st, process_len=download_data_len, **kwargs) except Exception as e: Logger.error(e) return None try: template = tribe[0] Logger.info("Made template: {0}".format(template)) except IndexError as e: Logger.error(e) return None template.name = event.resource_id.id.split('/')[-1] # Edit comment to reflect new template_name for comment in template.event.comments: if comment.text and comment.text.startswith("eqcorrscan_template_"): comment.text = "eqcorrscan_template_{0}".format(template.name) return template def _get_data_for_event( event: Event, client, download_data_len: float, path_structure: str, bank_path: str, template_name_structure: str, save_raw: bool = True, ) -> Stream: """ Get data for a given event. Parameters ---------- event Event to download data for. client Optional: Client with at-least a `get_waveforms` method, ideally the client should make the data for the events in catalog available. download_data_len If client is given this is the length of data to download. The raw continuous data will also be saved to disk to allow later processing if save_raw=True path_structure Bank path structure for writing data bank_path Location of bank to write data to template_name_structure Naming structure for template files. save_raw Whether to store raw data on disk as well - defaults to True. Returns ------- Stream as downloaded for the given event. """ if len(event.picks) == 0: Logger.warning("Event has no picks, no template created") return Stream() bulk = [ (p.waveform_id.network_code, p.waveform_id.station_code, p.waveform_id.location_code, p.waveform_id.channel_code, p.time - (.5 * download_data_len), p.time + (.6 * download_data_len)) for p in event.picks] Logger.debug(bulk) try: st = client.get_waveforms_bulk(bulk) except Exception as e: Logger.error(e) st = Stream() for channel in bulk: Logger.debug("Downloading individual channel {0}".format(channel)) try: st += client.get_waveforms( network=channel[0], station=channel[1], location=channel[2], channel=channel[3], starttime=channel[4], endtime=channel[5]) except Exception as e: Logger.error(e) # Trim to expected length try: st.merge() except Exception as e: Logger.error(e) return None # Cope with multiple picks on the same channel at different times. trimmed_stream = Stream() for pick in event.picks: try: tr = st.select(id=pick.waveform_id.get_seed_string())[0] except IndexError: Logger.warning("No data downloaded for {0}".format( pick.waveform_id.get_seed_string())) continue trimmed_stream += tr.slice( starttime=pick.time - (.45 * download_data_len), endtime=pick.time + (.55 * download_data_len)).copy() if len(trimmed_stream) == 0: Logger.error("No data downloaded, no template.") return None if save_raw: path = _summarize_event( event=event, path_struct=path_structure, name_struct=template_name_structure)["path"] path, _ = os.path.splitext(path) path += ".ms" ppath = (Path(bank_path) / path).absolute() ppath.parent.mkdir(parents=True, exist_ok=True) trimmed_stream.split().write(str(ppath), format="MSEED") Logger.debug("Saved raw data to {0}".format(ppath)) return trimmed_stream
[docs]def check_tribe_quality( tribe: Tribe, seed_ids: set = None, min_stations: int = None, lowcut: float = None, highcut: float = None, filt_order: int = None, samp_rate: float = None, process_len: float = None, *args, **kwargs ) -> Tribe: """ Check that templates in the tribe have channels all the same length. Parameters ---------- tribe A Tribe to check the quality of. seed_ids seed-ids of channels to be included in the templates - if None, then all channels will be included min_stations Minimum number of stations for a template to be included. lowcut Desired template processing lowcut in Hz, if None, will not check highcut Desired template processing highcut in Hz, if None, will not check filt_order Desired template filter order, if None, will not check samp_rate Desired template sampling rate in Hz, if None, will not check process_len Desired template processing length in s, if None, will not check Returns ------- A filtered tribe. """ processing_keys = dict( lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, process_length=process_len) Logger.info("Checking processing parameters: {0}".format(processing_keys)) min_stations = min_stations or 0 _templates = [] # Perform length check for template in tribe: counted_lengths = Counter([tr.stats.npts for tr in template.st]) if len(counted_lengths) > 1: Logger.warning( "Multiple lengths found in template, using most common" " ({0})".format(counted_lengths.most_common(1)[0][0])) _template = template.copy() _template.st = Stream() for tr in template.st: if tr.stats.npts == counted_lengths.most_common(1)[0][0]: _template.st += tr _templates.append(_template) else: _templates.append(template) templates = _templates # Check processing parameters _templates = [] for template in templates: for processing_key, processing_value in processing_keys.items(): if processing_value and template.__dict__[processing_key] != processing_value: Logger.warning("Template {0}: {1} does not match {2}".format( processing_key, template.__dict__[processing_key], processing_value)) break else: _templates.append(template) templates = _templates # Perform station check if seed_ids is None: seed_ids = {tr.id for template in tribe for tr in template.st} for template in templates: _st = Stream() for tr in template.st: if tr.id in seed_ids: _st += tr template.st = _st return Tribe([t for t in templates if len({tr.stats.station for tr in t.st}) > min_stations])
[docs]def remove_unreferenced(catalog: Union[Catalog, Event]) -> Catalog: """ Remove un-referenced arrivals, amplitudes and station_magnitudes. """ if isinstance(catalog, Event): catalog = Catalog([catalog]) catalog_out = Catalog() for _event in catalog: event = _event.copy() pick_ids = {p.resource_id for p in event.picks} # Remove unreferenced arrivals for origin in event.origins: origin.arrivals = [ arr for arr in origin.arrivals if arr.pick_id in pick_ids] # Remove unreferenced amplitudes event.amplitudes = [ amp for amp in event.amplitudes if amp.pick_id in pick_ids] amplitude_ids = {a.resource_id for a in event.amplitudes} # Remove now unreferenced station magnitudes event.station_magnitudes = [ sta_mag for sta_mag in event.station_magnitudes if sta_mag.amplitude_id in amplitude_ids] station_magnitude_ids = { sta_mag.resource_id for sta_mag in event.station_magnitudes} # Remove unreferenced station_magnitude_contributions for magnitude in event.magnitudes: magnitude.station_magnitude_contributions = [ sta_mag_contrib for sta_mag_contrib in magnitude.station_magnitude_contributions if sta_mag_contrib.station_magnitude_id in station_magnitude_ids] catalog_out.append(event) return catalog_out
if __name__ == "__main__": import doctest doctest.testmod()