streaming

Streaming module of RT_EQcorrscan


streaming.buffers

Classes for buffering seismic data in memory.

Author

Calum J Chamberlain

License

GPL v3.0

class rt_eqcorrscan.streaming.buffers.Buffer(traces=None, maxlen=None)[source]

Container for TraceBuffers.

Parameters

Examples

>>> from obspy import read
>>> st = read()
>>> print(st)
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples
>>> buffer = Buffer(st, maxlen=10.)
>>> print(buffer)
Buffer(3 traces, maxlen=10.0)
add_stream(stream)[source]

Add a stream or trace to the buffer.

Note that if stream is a Buffer, the maxlen of the initial Buffer will be used.

Parameters

stream (Union[Trace, Stream]) –

Return type

None

is_full(strict=False)[source]

Check whether the buffer is full or not.

If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)

Parameters

strict – Whether to check the whole deque (True), or just the start and end (False: default).

Return type

bool

sanitize_traces()[source]

Ensure all traces meet that maxlen criteria.

select(id)[source]

Select traces from the buffer based on seed id

Parameters

id (str) – Standard four-part seed id as {network}.{station}.{location}.{channel}

Return type

List of matching traces.

property stream: Stream

Get a static Stream view of the buffer

Return type

A stream representing the current state of the Buffer.

class rt_eqcorrscan.streaming.buffers.BufferStats(header=None)[source]

Container for header attributes for TraceBuffer.

This is very similar to obspy’s Stats, but endtime is fixed and starttime is calculated from other attributes.

class rt_eqcorrscan.streaming.buffers.NumpyDeque(data, maxlen)[source]

Simple implementation of necessary deque methods for 1D numpy arrays.

Parameters
  • data (Union[list, ndarray, MaskedArray]) – Data to initialize the deque with

  • maxlen (int) – Maximum length of the deque.

append(other)[source]

Append an element to the right of the deque.

If the new deque overflows, the leftmost element will be removed.

Parameters

other – Single element to add to the deque.

Return type

None

appendleft(other)[source]

Append an element to the left of the deque.

If the new deque overflows, the rightmost element will be removed.

Parameters

other – Single element to add to the deque.

Return type

None

extend(other)[source]

Put new data onto the right of the deque.

If the deque overflows maxlen the leftmost items will be removed. Works in place.

Parameters

other (Union[list, ndarray, MaskedArray]) – The new data to extend the deque with.

Return type

None

extendleft(other)[source]

Put new data onto the left of the deque.

If the deque overflows maxlen the rightmost items will be removed. Works in place.

Parameters

other (Union[list, ndarray, MaskedArray]) – The new data to extend the deque with.

Return type

None

insert(other, index)[source]

Insert elements between a stop and start point of the deque.

If other is a masked array, only unmasked elements will be inserted into the deque.

If other is longer than self.maxlen then it will be used to extend the deque.

Parameters
  • other (Union[list, ndarray, MaskedArray]) – Elements to insert.

  • index (int) – Start of the slice in the current deque

Return type

None

Examples

>>> np_deque = NumpyDeque(data=[0, 1, 2], maxlen=5)
>>> print(np_deque) 
NumpyDeque(data=[-- -- 0 1 2], maxlen=5)

Insert a single element list

>>> np_deque.insert([6], 1)
>>> print(np_deque) 
NumpyDeque(data=[-- 6 0 1 2], maxlen=5)

Insert a numpy array

>>> np_deque.insert(np.array([11, 12]), 3)
>>> print(np_deque) 
NumpyDeque(data=[-- 6 0 11 12], maxlen=5)

Insert a masked array - only the unmasked elements are used.

>>> np_deque.insert(
...     np.ma.masked_array([99, 99], mask=[True, False]), 2)
>>> print(np_deque) 
NumpyDeque(data=[-- 6 0 99 12], maxlen=5)

Insert an array longer than maxlen

>>> np_deque.insert(np.arange(10), 0)
>>> print(np_deque) 
NumpyDeque(data=[5 6 7 8 9], maxlen=5)
is_full(strict=False)[source]

Check whether the buffer is full.

If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)

Parameters

strict – Whether to check the whole deque (True), or just the start and end (False: default).

Return type

bool

class rt_eqcorrscan.streaming.buffers.TraceBuffer(data, header, maxlen)[source]

Container for Trace-like data Note: does not include standard Trace methods, use TraceBuffer.trace.`meth`

Parameters
  • data (ndarray) – Iterable of data - will be made into a deque.

  • header (Union[dict, Stats, BufferStats]) – Standard header info for an Obspy Trace.

  • maxlen (int) – Maximum length of trace in samples.

Examples

>>> from obspy import read
>>> st = read()
>>> trace_buffer = TraceBuffer(
...     data=st[0].data, header=st[0].stats, maxlen=100)
>>> len(trace_buffer)
100
>>> trace_buffer.stats.npts
100
>>> trace_buffer.stats.endtime
UTCDateTime(2009, 8, 24, 0, 20, 32, 990000)
>>> trace_buffer.stats.starttime
UTCDateTime(2009, 8, 24, 0, 20, 32)
>>> assert np.all(trace_buffer.data.data == st[0].data[-100:])
add_trace(trace)[source]

Add one TraceBuffer to another.

If overlaps occur, new-data (from other) are kept and old-data (from self) are replaced.

Parameters

trace (Trace) – New trace to add to the buffer - will be added in place.

Return type

None

Examples

>>> from obspy import Trace, UTCDateTime
>>> import numpy as np
>>> trace_buffer = TraceBuffer(
...     data=np.arange(10), header=dict(
...         station="bob", endtime=UTCDateTime(2018, 1, 1, 0, 0, 1),
...         delta=1.),
...     maxlen=15)
>>> print(trace_buffer.data) 
NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 5 6 7 8 9], maxlen=15)
>>> trace = Trace(
...     np.arange(7)[::-1], header=dict(
...         station="bob", starttime=UTCDateTime(2018, 1, 1), delta=1.))
>>> trace_buffer.add_trace(trace)
>>> print(trace_buffer.stats.endtime)
2018-01-01T00:00:06.000000Z
>>> print(trace_buffer.data) 
NumpyDeque(data=[0 1 2 3 4 5 6 7 6 5 4 3 2 1 0], maxlen=15)

Try adding a trace that is longer than the maxlen

>>> trace = Trace(
...     np.arange(20), header=dict(
...         station="bob", starttime=trace_buffer.stats.endtime,
...         delta=1.))
>>> print(trace.stats.endtime)
2018-01-01T00:00:25.000000Z
>>> trace_buffer.add_trace(trace)
>>> print(trace_buffer.stats.endtime)
2018-01-01T00:00:25.000000Z
>>> print(trace_buffer.data) 
NumpyDeque(data=[5 6 7 8 9 10 11 12 13 14 15 16 17 18 19], maxlen=15)

Add a trace that starts after the current tracebuffer ends

>>> trace = Trace(
...     np.arange(5), header=dict(
...         station="bob", starttime=trace_buffer.stats.endtime + 5,
...         delta=1.))
>>> print(trace.stats.endtime)
2018-01-01T00:00:34.000000Z
>>> trace_buffer.add_trace(trace)
>>> print(trace_buffer.stats.endtime)
2018-01-01T00:00:34.000000Z
>>> print(trace_buffer.data) 
NumpyDeque(data=[15 16 17 18 19 -- -- -- -- -- 0 1 2 3 4], maxlen=15)

Add a trace that starts one sample after the current trace ends

>>> trace = Trace(
...     np.arange(5), header=dict(
...         station="bob",
...         starttime=trace_buffer.stats.endtime + trace_buffer.stats.delta,
...         delta=1.))
>>> print(trace_buffer.stats.endtime)
2018-01-01T00:00:34.000000Z
>>> print(trace.stats.starttime)
2018-01-01T00:00:35.000000Z
>>> trace_buffer.add_trace(trace)
>>> print(trace_buffer.data) 
NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 0 1 2 3 4], maxlen=15)
copy()[source]

Generate a copy of the buffer.

Return type

A deepcopy of the tracebuffer.

property data_len: float

Length of buffer in seconds.

get_id()[source]

Return a SEED compatible identifier of the trace.

Return type

str

Returns

The SEED ID of the trace.

property id: str

Return a SEED compatible identifier of the trace.

Return type

The SEED ID of the trace.

is_full(strict=False)[source]

Check if the tracebuffer is full or not.

If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)

Parameters

strict – Whether to check the whole deque (True), or just the start and end (False: default).

Return type

bool

property trace: Trace

Get a static trace representation of the buffer.

Returns

  • A trace with the buffer’s data and stats. If there are gaps in the

  • buffer they will be masked.


streaming.streaming

Functions and classes for handling streaming of data for real-time matched-filtering.

Author

Calum J Chamberlain

License

GPL v3.0


streaming.clients

streaming.clients.obsplus

Data handling to simulate a real-time client from old data via ObsPlus WaveBank for testing of real-time matched-filter detection.

Author

Calum J Chamberlain

License

GPL v3.0

class rt_eqcorrscan.streaming.clients.obsplus.RealTimeClient(server_url, starttime, client=None, query_interval=10.0, speed_up=1.0, buffer=None, buffer_capacity=600.0, pre_empt_data=True, pre_empt_len=6000.0, **kwargs)[source]

Simulation of a real-time client for past data. Used for testing

Parameters
  • server_url (str) – The base-path for the ObsPlus wavebank.

  • client – Any client or that supports waveform data queries.

  • starttime (UTCDateTime) – Starttime for client (in the past)

  • query_interval (float) – Interval in seconds to query the client for new data

  • speed_up (float) – Multiplier to run faster than real-time (real-time is 1.0).

  • buffer (Optional[Stream]) – Stream to buffer data into

  • buffer_capacity (float) – Length of buffer in seconds. Old data are removed in a FIFO style.


streaming.clients.obspy

Data handling to simulate a real-time client from old data via ObsPy clients for testing of real-time matched-filter detection.

Author

Calum J Chamberlain

License

GPL v3.0

class rt_eqcorrscan.streaming.clients.obspy.RealTimeClient(server_url, starttime, client=None, client_type='FDSN', query_interval=10.0, speed_up=1.0, buffer=None, buffer_capacity=600.0, pre_empt_data=True, pre_empt_len=6000.0)[source]

Simulation of a real-time client for past data. Used for testing

Parameters
  • server_url (str) – URL or mappabale name of the client, if not providing a Client, then this should be the argument to set-up a client of client_type

  • client – Any client or that supports waveform data queries.

  • starttime (UTCDateTime) – Starttime for client (in the past)

  • client_type (str) – Obspy client type to start-up, only used if client=None.

  • query_interval (float) – Interval in seconds to query the client for new data

  • speed_up (float) – Multiplier to run faster than real-time (real-time is 1.0).

  • buffer (Optional[Stream]) – Stream to buffer data into

  • buffer_capacity (float) – Length of buffer in seconds. Old data are removed in a FIFO style.

copy(empty_buffer=True)[source]

Generate a new - unconnected client.

Parameters

empty_buffer (bool) – Whether to start the new client with an empty buffer or not.

restart()[source]

Restart the streamer.

Return type

None

select_stream(net, station, selector)[source]

Select streams to “stream”.

Return type

None

net

The network id

station

The station id

selector

a valid SEED ID channel selector, e.g. EHZ or EH?

start()[source]

Dummy - client is always started.

Return type

None

stop()[source]

Stop the system.

Return type

None

class rt_eqcorrscan.streaming.clients.obspy.StreamClient(client, buffer_length=3600, min_buffer_fraction=0.25, speed_up=1.0)[source]

In-memory handling of stream as fake Client. Cache future data in memory and provide via client-like get_waveforms requests.

Parameters
  • client – Client to get data from - can be anything with a .get_waveforms method

  • buffer_length (float) – Initial length of buffer in seconds

  • min_buffer_fraction (float) – Minimum buffer fraction to trigger a re-fresh of the buffer.

background_stop()[source]

Stop the background process.

get_waveforms(network, station, location, channel, starttime, endtime, rm_selected=True)[source]
Parameters
  • network (str) – Network code to get data for

  • station (str) – Station code to get data for

  • location (str) – Location code to get data for

  • channel (str) – Channel code to get data for

  • starttime (UTCDateTime) – Starttime to select data for

  • endtime (UTCDateTime) – Endtime to select data for

  • rm_selected (bool) – Whether to remove the data from the buffer - defaults to True

Return type

Stream of selected data.

get_waveforms_bulk(bulk, rm_selected=True)[source]
Parameters
  • bulk (Iterable) – Bulk of (network, station, location, channel, starttime, endtime) to request data for.

  • rm_selected (bool) – Whether to remove the selected data from the buffer. Default: True

Return type

Stream of select data from buffer.

initiate_buffer(seed_ids, starttime)[source]

Initial population of the buffer.

Parameters
  • seed_ids (Iterable[str]) – Iterable of seed ids as network.station.location.channel

  • starttime (UTCDateTime) – Starttime to initialise buffer from

maintain_buffer()[source]

Maintain buffer length in the background.

on_terminate()[source]

Handle termination gracefully

run()[source]

Maintain buffer length

property stats: dict

Provide a copy of the stats