Source code for rt_eqcorrscan.event_trigger.listener

"""
Listener ABC.
"""

import threading
import logging

import numpy as np
from obspy import UTCDateTime
from obspy.core.event import Event
from abc import ABC, abstractmethod
from collections import namedtuple
from typing import List, Union, Tuple

Logger = logging.getLogger(__name__)


EventInfo = namedtuple("EventInfo", ("event_id", "event_time"))


class _Listener(ABC):
    """
    Abstract base class for listener objects - anything to be used by the
    Reactor should fit in this scope.
    """
    busy = False

    threads = []
    client = None
    _old_events = []  # List of tuples of (event_id, event_time)
    keep = 86400  # Time in seconds to keep old events
    lock = threading.Lock()  # Lock for access to old_events

    @abstractmethod
    def run(self, *args, **kwargs):
        """ Run the listener """

    def get_old_events(self) -> List[EventInfo]:
        """ Threadsafe access to underlying list of tuples of old-events. """
        with self.lock:
            old_events = self._old_events
        return old_events

    def set_old_events(self, events: List[EventInfo]):
        with self.lock:
            self._old_events = events

    old_events = property(fget=get_old_events, fset=set_old_events)

    def remove_old_event(self, event: EventInfo):
        with self.lock:  # Make threadsafe
            self._old_events.remove(event)

    def extend(self, events: Union[EventInfo, List[EventInfo]]):
        """ Threadsafe way to add events to the cache """
        if isinstance(events, EventInfo):
            events = [events]
        with self.lock:
            self._old_events.extend(events)

    def append(self, other: EventInfo):
        assert isinstance(other, EventInfo)
        with self.lock:
            self._old_events.append(other)

    def _remove_old_events(self, endtime: UTCDateTime) -> None:
        """
        Expire old events from the cache.

        Parameters
        ----------
        endtime
            The time to calculate time-difference relative to. Any events
            older than endtime - self.keep will be removed.
        """
        if len(self.old_events) == 0:
            return
        time_diffs = np.array([endtime - tup[1] for tup in self.old_events])
        filt = time_diffs >= self.keep
        # Need to remove in-place, without creating a new list
        for i, old_event in enumerate(list(self.old_events)):
            if filt[i]:
                Logger.info(f"Removing event {old_event} which is {time_diffs[i]} old, older than {self.keep}")
                self.remove_old_event(old_event)

    def background_run(self, *args, **kwargs):
        self.busy = True
        listening_thread = threading.Thread(
            target=self.run, args=args, kwargs=kwargs,
            name="ListeningThread")
        listening_thread.daemon = True
        listening_thread.start()
        self.threads.append(listening_thread)
        Logger.info("Started listening to {0}".format(self.client.base_url))

    def background_stop(self):
        self.busy = False
        for thread in self.threads:
            thread.join(timeout=10)
            if thread.is_alive():
                # Didn't join within timeout...
                thread.join()


[docs]def event_time(event: Event) -> UTCDateTime: """ Get the origin or first pick time of an event. Parameters ---------- event: Event to get a time for Returns ------- Reference time for event. """ try: origin = event.preferred_origin() or event.origins[0] except IndexError: origin = None if origin is not None: return origin.time if len(event.picks) == 0: return UTCDateTime(0) return min([p.time for p in event.picks])
if __name__ == "__main__": import doctest doctest.testmod()