Source code for rt_eqcorrscan.streaming.clients.obspy
"""
Data handling to simulate a real-time client from old data via ObsPy clients
for testing of real-time matched-filter detection.
Author
Calum J Chamberlain
License
GPL v3.0
"""
import logging
import time
import copy
from numpy import random
import importlib
from obspy import Stream, UTCDateTime
from queue import Empty
from obsplus import WaveBank
from rt_eqcorrscan.streaming.streaming import _StreamingClient
Logger = logging.getLogger(__name__)
[docs]class RealTimeClient(_StreamingClient):
"""
Simulation of a real-time client for past data. Used for testing
Parameters
----------
server_url
URL or mappabale name of the client, if not providing a Client, then
this should be the argument to set-up a client of `client_type`
client
Any client or that supports waveform data queries.
starttime
Starttime for client (in the past)
client_type
Obspy client type to start-up, only used if `client=None`.
query_interval
Interval in seconds to query the client for new data
speed_up
Multiplier to run faster than real-time (real-time is 1.0).
buffer
Stream to buffer data into
buffer_capacity
Length of buffer in seconds. Old data are removed in a FIFO style.
"""
client_base = "obspy.clients"
can_add_streams = True
def __init__(
self,
server_url: str,
starttime: UTCDateTime,
client=None,
client_type: str = "FDSN",
query_interval: float = 10.,
speed_up: float = 1.,
buffer: Stream = None,
buffer_capacity: float = 600.,
) -> None:
if client is None:
try:
_client_module = importlib.import_module(
f"{self.client_base}.{client_type.lower()}")
client = _client_module.Client(server_url)
except Exception as e:
Logger.error("Could not instantiate simulated client")
raise e
self.client = client
super().__init__(
server_url=self.client.base_url, buffer=buffer,
buffer_capacity=buffer_capacity)
self.starttime = starttime
self.query_interval = query_interval
self.speed_up = speed_up
self.bulk = []
self.streaming = False
Logger.info(
"Instantiated simulated real-time client "
"(starttime = {0}): {1}".format(self.starttime, self))
[docs] def start(self) -> None:
""" Dummy - client is always started. """
self.started = True
return
[docs] def restart(self) -> None:
""" Restart the streamer. """
self.stop()
self.start()
[docs] def copy(self, empty_buffer: bool = True):
if empty_buffer:
buffer = Stream()
else:
buffer = self.stream
return RealTimeClient(
server_url=self.client.base_url,
client=self.client, starttime=self.starttime,
query_interval=self.query_interval, speed_up=self.speed_up,
buffer=buffer, buffer_capacity=self.buffer_capacity)
[docs] def select_stream(self, net: str, station: str, selector: str) -> None:
"""
Select streams to "stream".
net
The network id
station
The station id
selector
a valid SEED ID channel selector, e.g. ``EHZ`` or ``EH?``
"""
_bulk = {
"network": net, "station": station, "location": "*",
"channel": selector, "starttime": None, "endtime": None}
if _bulk not in self.bulk:
Logger.debug("Added {0} to streaming selection".format(_bulk))
self.bulk.append(_bulk)
def run(self) -> None:
assert len(self.bulk) > 0, "Select a stream first"
self.streaming = True
now = copy.deepcopy(self.starttime)
self.last_data = UTCDateTime.now()
last_query_start = now - self.query_interval
while not self._stop_called:
# If this is running in a process then we need to check the queue
try:
kill = self._killer_queue.get(block=False)
except Empty:
kill = False
Logger.info(f"Kill status: {kill}")
if kill:
Logger.warning("Termination called, stopping collect loop")
self.on_terminate()
break
_query_start = UTCDateTime.now()
st = Stream()
query_passed = True
for _bulk in self.bulk:
jitter = random.randint(int(self.query_interval))
_bulk.update({
"starttime": last_query_start,
"endtime": now - jitter})
Logger.debug("Querying client for {0}".format(_bulk))
try:
st += self.client.get_waveforms(**_bulk)
except Exception as e:
Logger.error("Failed (bulk={0})".format(_bulk))
Logger.error(e)
query_passed = False
continue
for tr in st:
self.on_data(tr)
# Put the data in the buffer
self._add_data_from_queue()
_query_duration = UTCDateTime.now() - _query_start
Logger.debug(
"It took {0:.2f}s to query the database and sort data".format(
_query_duration))
sleep_step = (
self.query_interval - _query_duration) / self.speed_up
if sleep_step > 0:
Logger.debug("Waiting {0:.2f}s before next query".format(
sleep_step))
time.sleep(sleep_step)
now += max(self.query_interval, _query_duration)
if query_passed:
last_query_start = min(_bulk["endtime"] for _bulk in self.bulk)
self.streaming = False
return
[docs] def stop(self) -> None:
Logger.info("STOP!")
self._stop_called, self.started = True, False
if __name__ == "__main__":
import doctest
logging.basicConfig(level="DEBUG")
doctest.testmod()