Source code for rt_eqcorrscan.streaming.buffers

"""
Classes for buffering seismic data in memory.

Author
    Calum J Chamberlain
License
    GPL v3.0
"""
import logging
import copy
import numpy as np

from typing import Union, List
from collections.abc import Sized

from obspy import Stream, Trace, UTCDateTime
from obspy.core.trace import Stats
from obspy.core.util import AttribDict

Logger = logging.getLogger(__name__)


[docs]class BufferStats(AttribDict): """ Container for header attributes for TraceBuffer. This is very similar to obspy's Stats, but endtime is fixed and starttime is calculated from other attributes. """ readonly = ['starttime'] defaults = { 'sampling_rate': 1.0, 'delta': 1.0, 'starttime': UTCDateTime(0), 'endtime': UTCDateTime(0), 'npts': 0, 'calib': 1.0, 'network': '', 'station': '', 'location': '', 'channel': '', } def __repr__(self): # pragma: no cover return self.__str__() def __init__(self, header: dict = None): super().__init__(header) def __setitem__(self, key, value): # keys which need to refresh derived values if key in ['delta', 'sampling_rate', 'endtime', 'npts']: # ensure correct data type if key == 'delta': key = 'sampling_rate' try: value = 1.0 / float(value) except ZeroDivisionError: value = 0.0 elif key == 'sampling_rate': value = float(value) elif key == 'endtime': value = UTCDateTime(value) elif key == 'npts': if not isinstance(value, int): value = int(value) # set current key super().__setitem__(key, value) # set derived value: delta try: delta = 1.0 / float(self.sampling_rate) except ZeroDivisionError: delta = 0 self.__dict__['delta'] = delta # set derived value: starttime if self.npts == 0: timediff = 0 else: timediff = float(self.npts - 1) * delta self.__dict__['starttime'] = self.endtime - timediff return # prevent a calibration factor of 0 if key == 'calib' and value == 0: Logger.warning('Calibration factor set to 0.0!') return # all other keys if isinstance(value, dict): super().__setitem__(key, AttribDict(value)) else: super().__setitem__(key, value) __setattr__ = __setitem__ def __str__(self): # pragma: no cover """ Return better readable string representation of BufferStats object. """ priorized_keys = ['network', 'station', 'location', 'channel', 'starttime', 'endtime', 'sampling_rate', 'delta', 'npts', 'calib'] return self._pretty_str(priorized_keys) def _repr_pretty_(self, p, cycle): # pragma: no cover p.text(str(self))
[docs]class TraceBuffer(object): """ Container for Trace-like data Note: does not include standard Trace methods, use TraceBuffer.trace.`meth` Parameters ---------- data Iterable of data - will be made into a deque. header Standard header info for an Obspy Trace. maxlen Maximum length of trace in samples. Examples -------- >>> from obspy import read >>> st = read() >>> trace_buffer = TraceBuffer( ... data=st[0].data, header=st[0].stats, maxlen=100) >>> len(trace_buffer) 100 >>> trace_buffer.stats.npts 100 >>> trace_buffer.stats.endtime UTCDateTime(2009, 8, 24, 0, 20, 32, 990000) >>> trace_buffer.stats.starttime UTCDateTime(2009, 8, 24, 0, 20, 32) >>> assert np.all(trace_buffer.data.data == st[0].data[-100:]) """ def __init__( self, data: np.ndarray, header: Union[dict, Stats, BufferStats], maxlen: int ): # Take the right-most samples self.data = NumpyDeque(data, maxlen=maxlen) header = copy.deepcopy(header) # We need to make sure that starttime is correctly set self.stats = BufferStats(header) self.stats.npts = len(self.data.data) def __repr__(self) -> str: # pragma: no cover return "TraceBuffer(data={0}, header={1}, maxlen={2})".format( self.data, self.stats, self.data.maxlen) def __len__(self) -> int: """ Length of buffer in samples. """ return len(self.data) def __add__(self, other): new = self.copy() new.add_trace(other) return new def __iadd__(self, other): self.add_trace(other) return self @property def data_len(self) -> float: """ Length of buffer in seconds. """ return self.__len__() * self.stats.delta
[docs] def get_id(self) -> str: """ Return a SEED compatible identifier of the trace. Returns ------- The SEED ID of the trace. """ seed_formatter = "{network}.{station}.{location}.{channel}" return seed_formatter.format(**self.stats)
id = property(get_id) @id.setter def id(self, value): try: net, sta, loc, cha = value.split(".") except AttributeError: msg = ("Can only set a Trace's SEED ID from a string " "(and not from {})").format(type(value)) raise TypeError(msg) except ValueError: msg = "Not a valid SEED ID: '{}'".format(value) raise ValueError(msg) self.stats.network = net self.stats.station = sta self.stats.location = loc self.stats.channel = cha
[docs] def add_trace(self, trace: Trace) -> None: """ Add one TraceBuffer to another. If overlaps occur, new-data (from other) are kept and old-data (from self) are replaced. Parameters ---------- trace New trace to add to the buffer - will be added in place. Examples -------- >>> from obspy import Trace, UTCDateTime >>> import numpy as np >>> trace_buffer = TraceBuffer( ... data=np.arange(10), header=dict( ... station="bob", endtime=UTCDateTime(2018, 1, 1, 0, 0, 1), ... delta=1.), ... maxlen=15) >>> print(trace_buffer.data) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 5 6 7 8 9], maxlen=15) >>> trace = Trace( ... np.arange(7)[::-1], header=dict( ... station="bob", starttime=UTCDateTime(2018, 1, 1), delta=1.)) >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:06.000000Z >>> print(trace_buffer.data) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[0 1 2 3 4 5 6 7 6 5 4 3 2 1 0], maxlen=15) Try adding a trace that is longer than the maxlen >>> trace = Trace( ... np.arange(20), header=dict( ... station="bob", starttime=trace_buffer.stats.endtime, ... delta=1.)) >>> print(trace.stats.endtime) 2018-01-01T00:00:25.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:25.000000Z >>> print(trace_buffer.data) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[5 6 7 8 9 10 11 12 13 14 15 16 17 18 19], maxlen=15) Add a trace that starts after the current tracebuffer ends >>> trace = Trace( ... np.arange(5), header=dict( ... station="bob", starttime=trace_buffer.stats.endtime + 5, ... delta=1.)) >>> print(trace.stats.endtime) 2018-01-01T00:00:34.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:34.000000Z >>> print(trace_buffer.data) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[15 16 17 18 19 -- -- -- -- -- 0 1 2 3 4], maxlen=15) Add a trace that starts one sample after the current trace ends >>> trace = Trace( ... np.arange(5), header=dict( ... station="bob", ... starttime=trace_buffer.stats.endtime + trace_buffer.stats.delta, ... delta=1.)) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:34.000000Z >>> print(trace.stats.starttime) 2018-01-01T00:00:35.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.data) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 0 1 2 3 4], maxlen=15) """ if isinstance(trace, TraceBuffer): trace = trace.trace # Check that stats match assert self.id == trace.id, "IDs {0} and {1} differ".format( self.id, trace.id) assert self.stats.sampling_rate == trace.stats.sampling_rate, ( "Sampling rates {0} and {1} differ".format( self.stats.sampling_rate, trace.stats.sampling_rate)) assert self.stats.calib == trace.stats.calib, ( "Calibration factors {0} and {1} differ".format( self.stats.calib, trace.stats.calib)) if len(trace) == 0: # Nothing to do with an empty trace. return # Remove older data than our minimum starttime - faster to if this. if trace.stats.starttime < self.stats.starttime: trace = trace.slice(starttime=self.stats.starttime) if len(trace) == 0: return # If data are newer in trace than in self. if trace.stats.endtime > self.stats.endtime: # If there is overlap if trace.stats.starttime <= self.stats.endtime: old_data = trace.slice(endtime=self.stats.endtime).data insert_start = -len(old_data) self.data.insert(old_data, insert_start) new_data = trace.slice( starttime=self.stats.endtime + self.stats.delta).data # If there is a gap - defined as more than 1.5 samples. Coping with # rounding errors in UTCDateTime. elif trace.stats.starttime >= self.stats.endtime + (1.5 * self.stats.delta): new_data = np.empty( trace.stats.npts + int(self.stats.sampling_rate * (trace.stats.starttime - self.stats.endtime)), dtype=trace.data.dtype) mask = np.ones_like(new_data) new_data[-trace.stats.npts:] = trace.data mask[-trace.stats.npts:] = 0 new_data = np.ma.masked_array(new_data, mask=mask) # Otherwise just extend with the new data. else: new_data = trace.data self.data.extend(new_data) self.stats.endtime = trace.stats.endtime else: # No new times covered - insert old data into array. insert_start = (trace.stats.starttime - self.stats.starttime) * self.stats.sampling_rate # Cope with small shifts due to sampling time-stamp rounding assert abs(insert_start - round(insert_start)) < .1, \ "Traces are not sampled at the same base time-stamp, {0} != {1}".format( round(insert_start), insert_start) self.data.insert(trace.data, int(round(insert_start))) self.stats.npts = len(self.data.data)
@property def trace(self) -> Trace: """ Get a static trace representation of the buffer. Returns ------- A trace with the buffer's data and stats. If there are gaps in the buffer they will be masked. """ return Trace(header=self.stats.__dict__, data=self.data.data.copy())
[docs] def is_full(self, strict=False) -> bool: """ Check if the tracebuffer is full or not. If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False) Parameters ---------- strict Whether to check the whole deque (True), or just the start and end (False: default). """ return self.data.is_full(strict=strict)
[docs] def copy(self): """ Generate a copy of the buffer. Returns ------- A deepcopy of the tracebuffer. """ return TraceBuffer( data=copy.deepcopy(self.data.data), header=copy.deepcopy(self.stats.__dict__), maxlen=copy.deepcopy(self.data.maxlen))
[docs]class NumpyDeque(object): """ Simple implementation of necessary deque methods for 1D numpy arrays. Parameters ---------- data Data to initialize the deque with maxlen Maximum length of the deque. """ def __init__( self, data: Union[list, np.ndarray, np.ma.MaskedArray], maxlen: int ): self._maxlen = maxlen self._data = np.empty(maxlen, dtype=type(data[0])) length = min(maxlen, len(data)) if isinstance(data, np.ma.MaskedArray): mask_value = data.mask data = data.data else: mask_value = np.zeros(length) self._data[-length:] = data[-length:] self._mask = np.zeros_like(self._data, dtype=bool) self._mask[0:-length] = np.ones(maxlen - length) self._mask[-length:] = mask_value def __repr__(self): data_str = self.data.__str__() if data_str.startswith("[ "): data_str = data_str.replace("[ ", "[") return "NumpyDeque(data={0}, maxlen={1})".format( data_str, self.maxlen) def __len__(self): return sum(~self._mask) def __getitem__(self, item): return self.data.__getitem__(item) @property def maxlen(self) -> int: return self._maxlen @property def data(self) -> np.ndarray: if self._mask.sum() > 0: return np.ma.masked_array(self._data, mask=self._mask) return self._data
[docs] def is_full(self, strict=False) -> bool: """ Check whether the buffer is full. If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False) Parameters ---------- strict Whether to check the whole deque (True), or just the start and end (False: default). """ if strict: return ~np.any(self._mask) return ~np.any([self._mask[0], self._mask[-1]])
[docs] def extend( self, other: Union[list, np.ndarray, np.ma.MaskedArray] ) -> None: """ Put new data onto the right of the deque. If the deque overflows maxlen the leftmost items will be removed. Works in place. Parameters ---------- other The new data to extend the deque with. """ other_length = len(other) if other_length == 0: # Nothing to see here, move-along. return if isinstance(other, np.ma.MaskedArray): mask_value = other.mask other = other.data else: mask_value = np.zeros(other_length) if other_length >= self.maxlen: self._data[0:] = other[-self.maxlen:] self._mask[:] = mask_value[-self.maxlen:] return self._data[0:-other_length] = self._data[other_length:] self._mask[0:-other_length] = self._mask[other_length:] self._data[-other_length:] = other self._mask[-other_length:] = mask_value
[docs] def extendleft( self, other: Union[list, np.ndarray, np.ma.MaskedArray] ) -> None: """ Put new data onto the left of the deque. If the deque overflows maxlen the rightmost items will be removed. Works in place. Parameters ---------- other The new data to extend the deque with. """ other_length = len(other) if other_length == 0: # Nothing to see here, move along. return if isinstance(other, np.ma.MaskedArray): mask_value = other.mask other = other.data else: mask_value = np.zeros(other_length) if other_length >= self.maxlen: self._data[0:] = other[0:self.maxlen] self._mask[:] = mask_value[0:self.maxlen] return self._data[other_length:] = self._data[0:-other_length] self._mask[other_length:] = self._mask[0:-other_length] self._data[0:other_length] = other self._mask[0:other_length] = mask_value
[docs] def append(self, other) -> None: """ Append an element to the right of the deque. If the new deque overflows, the leftmost element will be removed. Parameters ---------- other Single element to add to the deque. """ if isinstance(other, Sized): raise TypeError("other must be a single item, use extend") self.extend([other])
[docs] def appendleft(self, other) -> None: """ Append an element to the left of the deque. If the new deque overflows, the rightmost element will be removed. Parameters ---------- other Single element to add to the deque. """ if isinstance(other, Sized): raise TypeError("other must be a single item, use extendleft") self.extendleft([other])
[docs] def insert( self, other: Union[list, np.ndarray, np.ma.MaskedArray], index: int, ) -> None: """ Insert elements between a stop and start point of the deque. If `other` is a masked array, only unmasked elements will be inserted into the deque. If `other` is longer than `self.maxlen` then it will be used to extend the deque. Parameters ---------- other Elements to insert. index Start of the slice in the current deque Examples -------- >>> np_deque = NumpyDeque(data=[0, 1, 2], maxlen=5) >>> print(np_deque) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- -- 0 1 2], maxlen=5) Insert a single element list >>> np_deque.insert([6], 1) >>> print(np_deque) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- 6 0 1 2], maxlen=5) Insert a numpy array >>> np_deque.insert(np.array([11, 12]), 3) >>> print(np_deque) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- 6 0 11 12], maxlen=5) Insert a masked array - only the unmasked elements are used. >>> np_deque.insert( ... np.ma.masked_array([99, 99], mask=[True, False]), 2) >>> print(np_deque) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[-- 6 0 99 12], maxlen=5) Insert an array longer than maxlen >>> np_deque.insert(np.arange(10), 0) >>> print(np_deque) # doctest: +NORMALIZE_WHITESPACE NumpyDeque(data=[5 6 7 8 9], maxlen=5) """ if not isinstance(other, Sized): other = [other] stop = index + len(other) if len(other) > self.maxlen: Logger.warning("Array longer than max-length, reverting to extend") self.extend(other) elif isinstance(other, np.ma.MaskedArray): # Only take the non-masked bits for i in range(len(other)): if not other.mask[i]: self._data[index + i] = other.data[i] self._mask[index + i] = 0 else: if stop == 0: self._data[index:] = other self._mask[index:] = 0 else: self._data[index:stop] = other self._mask[index:stop] = 0
[docs]class Buffer(object): """ Container for TraceBuffers. Parameters ---------- traces Stream or List of TraceBuffers or Traces maxlen Maximum length for TraceBuffers in seconds. Examples -------- >>> from obspy import read >>> st = read() >>> print(st) 3 Trace(s) in Stream: BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples >>> buffer = Buffer(st, maxlen=10.) >>> print(buffer) Buffer(3 traces, maxlen=10.0) """ def __init__( self, traces: Union[Stream, List[Union[Trace, TraceBuffer]]] = None, maxlen: float = None ): assert traces or maxlen, "Requires at least maxlen or traces." assert isinstance(traces, (Stream, list)), "Must be either Stream or list" for tr in traces: assert isinstance(tr, (Trace, TraceBuffer)), "Must be Trace or TraceBuffer" self.traces = [] self._maxlen = maxlen or max( [tr.stats.npts * tr.stats.delta for tr in traces]) for tr in traces: if isinstance(tr, Trace): self.traces.append(TraceBuffer( data=tr.data, header=tr.stats, maxlen=int(tr.stats.sampling_rate * self._maxlen))) elif isinstance(tr, TraceBuffer): self.traces.append(tr) self.sanitize_traces() def __repr__(self): return "Buffer({0} traces, maxlen={1})".format( self.__len__(), self.maxlen) def __iter__(self): return self.traces.__iter__() def __len__(self): return len(self.traces) def __add__(self, other: Union[Trace, Stream]): new_buffer = self.copy() new_buffer.add_stream(other) return new_buffer def __iadd__(self, other: Union[Trace, Stream]): self.add_stream(other) return self def copy(self): return Buffer(traces=[tr.trace for tr in self.traces], maxlen=self.maxlen) @property def maxlen(self): return self._maxlen @maxlen.setter def maxlen(self, maxlen): self._maxlen = maxlen self.sanitize_traces()
[docs] def sanitize_traces(self): """ Ensure all traces meet that maxlen criteria. """ _traces = [] for tr in self.traces: _maxlen = int(self.maxlen * tr.stats.sampling_rate) if not tr.data.maxlen == _maxlen: # Need to make a new tracebuffer with the correct maxlen _tr = tr.trace _traces.append(TraceBuffer( data=_tr.data, header=_tr.stats, maxlen=_maxlen)) else: _traces.append(tr) self.traces = _traces
[docs] def add_stream(self, stream: Union[Trace, Stream]) -> None: """ Add a stream or trace to the buffer. Note that if `stream` is a Buffer, the maxlen of the initial Buffer will be used. Parameters ---------- stream """ if isinstance(stream, Trace): stream = Stream([stream]) elif isinstance(stream, Buffer): stream = stream.stream for tr in stream: traces_in_buffer = self.select(id=tr.id) if len(traces_in_buffer) > 0: # Append traces to the current buffer for trace_in_buffer in traces_in_buffer: trace_in_buffer.add_trace(tr) else: # Add as a new trace to the buffer self.traces.append(TraceBuffer( data=tr.data, header=tr.stats, maxlen=int(self.maxlen * tr.stats.sampling_rate)))
[docs] def select(self, id: str) -> List: """ Select traces from the buffer based on seed id Parameters ---------- id Standard four-part seed id as {network}.{station}.{location}.{channel} Returns ------- List of matching traces. """ return [tr for tr in self.traces if tr.id == id]
@property def stream(self) -> Stream: """ Get a static Stream view of the buffer Returns ------- A stream representing the current state of the Buffer. """ return Stream([tr.trace for tr in self.traces])
[docs] def is_full(self, strict=False) -> bool: """ Check whether the buffer is full or not. If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False) Parameters ---------- strict Whether to check the whole deque (True), or just the start and end (False: default). """ for tr in self.traces: if not tr.is_full(strict=strict): return False return True
if __name__ == "__main__": import doctest doctest.testmod()