streaming¶
Streaming module of RT_EQcorrscan
streaming.buffers¶
Classes for buffering seismic data in memory.
- Author
Calum J Chamberlain
- License
GPL v3.0
- class rt_eqcorrscan.streaming.buffers.Buffer(traces=None, maxlen=None)[source]¶
Container for TraceBuffers.
- Parameters
Examples
>>> from obspy import read >>> st = read() >>> print(st) 3 Trace(s) in Stream: BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:32.990000Z | 100.0 Hz, 3000 samples >>> buffer = Buffer(st, maxlen=10.) >>> print(buffer) Buffer(3 traces, maxlen=10.0)
- add_stream(stream)[source]¶
Add a stream or trace to the buffer.
Note that if stream is a Buffer, the maxlen of the initial Buffer will be used.
- is_full(strict=False)[source]¶
Check whether the buffer is full or not.
If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)
- Parameters
strict – Whether to check the whole deque (True), or just the start and end (False: default).
- Return type
- class rt_eqcorrscan.streaming.buffers.BufferStats(header=None)[source]¶
Container for header attributes for TraceBuffer.
This is very similar to obspy’s Stats, but endtime is fixed and starttime is calculated from other attributes.
- class rt_eqcorrscan.streaming.buffers.NumpyDeque(data, maxlen)[source]¶
Simple implementation of necessary deque methods for 1D numpy arrays.
- Parameters
- append(other)[source]¶
Append an element to the right of the deque.
If the new deque overflows, the leftmost element will be removed.
- Parameters
other – Single element to add to the deque.
- Return type
- appendleft(other)[source]¶
Append an element to the left of the deque.
If the new deque overflows, the rightmost element will be removed.
- Parameters
other – Single element to add to the deque.
- Return type
- extend(other)[source]¶
Put new data onto the right of the deque.
If the deque overflows maxlen the leftmost items will be removed. Works in place.
- extendleft(other)[source]¶
Put new data onto the left of the deque.
If the deque overflows maxlen the rightmost items will be removed. Works in place.
- insert(other, index)[source]¶
Insert elements between a stop and start point of the deque.
If other is a masked array, only unmasked elements will be inserted into the deque.
If other is longer than self.maxlen then it will be used to extend the deque.
- Parameters
- Return type
Examples
>>> np_deque = NumpyDeque(data=[0, 1, 2], maxlen=5) >>> print(np_deque) NumpyDeque(data=[-- -- 0 1 2], maxlen=5)
Insert a single element list
>>> np_deque.insert([6], 1) >>> print(np_deque) NumpyDeque(data=[-- 6 0 1 2], maxlen=5)
Insert a numpy array
>>> np_deque.insert(np.array([11, 12]), 3) >>> print(np_deque) NumpyDeque(data=[-- 6 0 11 12], maxlen=5)
Insert a masked array - only the unmasked elements are used.
>>> np_deque.insert( ... np.ma.masked_array([99, 99], mask=[True, False]), 2) >>> print(np_deque) NumpyDeque(data=[-- 6 0 99 12], maxlen=5)
Insert an array longer than maxlen
>>> np_deque.insert(np.arange(10), 0) >>> print(np_deque) NumpyDeque(data=[5 6 7 8 9], maxlen=5)
- is_full(strict=False)[source]¶
Check whether the buffer is full.
If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)
- Parameters
strict – Whether to check the whole deque (True), or just the start and end (False: default).
- Return type
- class rt_eqcorrscan.streaming.buffers.TraceBuffer(data, header, maxlen)[source]¶
Container for Trace-like data Note: does not include standard Trace methods, use TraceBuffer.trace.`meth`
- Parameters
Examples
>>> from obspy import read >>> st = read() >>> trace_buffer = TraceBuffer( ... data=st[0].data, header=st[0].stats, maxlen=100) >>> len(trace_buffer) 100 >>> trace_buffer.stats.npts 100 >>> trace_buffer.stats.endtime UTCDateTime(2009, 8, 24, 0, 20, 32, 990000) >>> trace_buffer.stats.starttime UTCDateTime(2009, 8, 24, 0, 20, 32) >>> assert np.all(trace_buffer.data.data == st[0].data[-100:])
- add_trace(trace)[source]¶
Add one TraceBuffer to another.
If overlaps occur, new-data (from other) are kept and old-data (from self) are replaced.
- Parameters
trace (
Trace
) – New trace to add to the buffer - will be added in place.- Return type
Examples
>>> from obspy import Trace, UTCDateTime >>> import numpy as np >>> trace_buffer = TraceBuffer( ... data=np.arange(10), header=dict( ... station="bob", endtime=UTCDateTime(2018, 1, 1, 0, 0, 1), ... delta=1.), ... maxlen=15) >>> print(trace_buffer.data) NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 5 6 7 8 9], maxlen=15) >>> trace = Trace( ... np.arange(7)[::-1], header=dict( ... station="bob", starttime=UTCDateTime(2018, 1, 1), delta=1.)) >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:06.000000Z >>> print(trace_buffer.data) NumpyDeque(data=[0 1 2 3 4 5 6 7 6 5 4 3 2 1 0], maxlen=15)
Try adding a trace that is longer than the maxlen
>>> trace = Trace( ... np.arange(20), header=dict( ... station="bob", starttime=trace_buffer.stats.endtime, ... delta=1.)) >>> print(trace.stats.endtime) 2018-01-01T00:00:25.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:25.000000Z >>> print(trace_buffer.data) NumpyDeque(data=[5 6 7 8 9 10 11 12 13 14 15 16 17 18 19], maxlen=15)
Add a trace that starts after the current tracebuffer ends
>>> trace = Trace( ... np.arange(5), header=dict( ... station="bob", starttime=trace_buffer.stats.endtime + 5, ... delta=1.)) >>> print(trace.stats.endtime) 2018-01-01T00:00:34.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:34.000000Z >>> print(trace_buffer.data) NumpyDeque(data=[15 16 17 18 19 -- -- -- -- -- 0 1 2 3 4], maxlen=15)
Add a trace that starts one sample after the current trace ends
>>> trace = Trace( ... np.arange(5), header=dict( ... station="bob", ... starttime=trace_buffer.stats.endtime + trace_buffer.stats.delta, ... delta=1.)) >>> print(trace_buffer.stats.endtime) 2018-01-01T00:00:34.000000Z >>> print(trace.stats.starttime) 2018-01-01T00:00:35.000000Z >>> trace_buffer.add_trace(trace) >>> print(trace_buffer.data) NumpyDeque(data=[-- -- -- -- -- 0 1 2 3 4 0 1 2 3 4], maxlen=15)
- get_id()[source]¶
Return a SEED compatible identifier of the trace.
- Return type
- Returns
The SEED ID of the trace.
- property id: str¶
Return a SEED compatible identifier of the trace.
- Return type
The SEED ID of the trace.
- is_full(strict=False)[source]¶
Check if the tracebuffer is full or not.
If strict=False (default) then only the start and end of the buffer are checked. Otherwise (strict=True) the whole buffer must contain real data (e.g. the mask is all False)
- Parameters
strict – Whether to check the whole deque (True), or just the start and end (False: default).
- Return type
streaming.streaming¶
Functions and classes for handling streaming of data for real-time matched-filtering.
- Author
Calum J Chamberlain
- License
GPL v3.0
streaming.clients¶
streaming.clients.obsplus¶
Data handling to simulate a real-time client from old data via ObsPlus WaveBank for testing of real-time matched-filter detection.
- Author
Calum J Chamberlain
- License
GPL v3.0
- class rt_eqcorrscan.streaming.clients.obsplus.RealTimeClient(server_url, starttime, client=None, query_interval=10.0, speed_up=1.0, buffer=None, buffer_capacity=600.0, pre_empt_data=True, pre_empt_len=6000.0, **kwargs)[source]¶
Simulation of a real-time client for past data. Used for testing
- Parameters
server_url (
str
) – The base-path for the ObsPlus wavebank.client – Any client or that supports waveform data queries.
starttime (
UTCDateTime
) – Starttime for client (in the past)query_interval (
float
) – Interval in seconds to query the client for new dataspeed_up (
float
) – Multiplier to run faster than real-time (real-time is 1.0).buffer_capacity (
float
) – Length of buffer in seconds. Old data are removed in a FIFO style.
streaming.clients.obspy¶
Data handling to simulate a real-time client from old data via ObsPy clients for testing of real-time matched-filter detection.
- Author
Calum J Chamberlain
- License
GPL v3.0
- class rt_eqcorrscan.streaming.clients.obspy.RealTimeClient(server_url, starttime, client=None, client_type='FDSN', query_interval=10.0, speed_up=1.0, buffer=None, buffer_capacity=600.0, pre_empt_data=True, pre_empt_len=6000.0)[source]¶
Simulation of a real-time client for past data. Used for testing
- Parameters
server_url (
str
) – URL or mappabale name of the client, if not providing a Client, then this should be the argument to set-up a client of client_typeclient – Any client or that supports waveform data queries.
starttime (
UTCDateTime
) – Starttime for client (in the past)client_type (
str
) – Obspy client type to start-up, only used if client=None.query_interval (
float
) – Interval in seconds to query the client for new dataspeed_up (
float
) – Multiplier to run faster than real-time (real-time is 1.0).buffer_capacity (
float
) – Length of buffer in seconds. Old data are removed in a FIFO style.
- copy(empty_buffer=True)[source]¶
Generate a new - unconnected client.
- Parameters
empty_buffer (
bool
) – Whether to start the new client with an empty buffer or not.
- class rt_eqcorrscan.streaming.clients.obspy.StreamClient(client, buffer_length=3600, min_buffer_fraction=0.25, speed_up=1.0)[source]¶
In-memory handling of stream as fake Client. Cache future data in memory and provide via client-like get_waveforms requests.
- Parameters
- get_waveforms(network, station, location, channel, starttime, endtime, rm_selected=True)[source]¶
- Parameters
network (
str
) – Network code to get data forstation (
str
) – Station code to get data forlocation (
str
) – Location code to get data forchannel (
str
) – Channel code to get data forstarttime (
UTCDateTime
) – Starttime to select data forendtime (
UTCDateTime
) – Endtime to select data forrm_selected (
bool
) – Whether to remove the data from the buffer - defaults to True
- Return type
Stream of selected data.
- initiate_buffer(seed_ids, starttime)[source]¶
Initial population of the buffer.
- Parameters
seed_ids (
Iterable
[str
]) – Iterable of seed ids as network.station.location.channelstarttime (
UTCDateTime
) – Starttime to initialise buffer from
streaming.clients.seedlink¶
Data handling for seedlink clients for real-time matched-filter detection.
- Author
Calum J Chamberlain
- License
GPL v3.0
- class rt_eqcorrscan.streaming.clients.seedlink.RealTimeClient(server_url, buffer=None, buffer_capacity=600.0)[source]¶
SeedLink client link for Real-Time Matched-Filtering.
- Parameters
- copy(empty_buffer=True)[source]¶
Generate a new, unconnected copy of the client.
- Parameters
empty_buffer (
bool
) – Whether to start the new client with an empty buffer or not.
- run()[source]¶
Start streaming data from the SeedLink server.
Streams need to be selected using
select_stream()
before this is called.This method enters an infinite loop, calling the client’s callbacks when events occur.
This is an edited version of EasySeedlinkClient from ObsPy allowing for connection closure from another process.