"""
Plotting for real-time seismic data.
"""
import numpy as np
import logging
import threading
import datetime as dt
import asyncio
from pyproj import Proj, Transformer
from bokeh.document import Document
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, HoverTool, Legend, WMTSTileSource
from bokeh.models.glyphs import MultiLine
from bokeh.models.formatters import DatetimeTickFormatter
from bokeh.layouts import gridplot, column
from bokeh.server.server import Server
from bokeh.application import Application
from bokeh.application.handlers.function import FunctionHandler
from functools import partial
from obspy import Inventory
from rt_eqcorrscan.rt_match_filter import RealTimeTribe
from rt_eqcorrscan.streaming.streaming import _StreamingClient
Logger = logging.getLogger(__name__)
[docs]class EQcorrscanPlot:
"""
Streaming bokeh plotting of waveforms.
Parameters
----------
rt_client
The real-time streaming client in use.
plot_length
Plot length in seconds
tribe
Tribe of templates used in real-time detection
inventory
Inventory of stations used - will be plotted on the map.
detections
List of `eqcorrscan.core.match_filter.Detection`
update_interval
Update frequency of plot in ms
plot_height
Plot height in screen units
plot_width
Plot width in screen units
exclude_channels
Iterable of channel codes to exclude from plotting
offline
Flag to set time-stamps to data time-stamps if True, else timestamps
will be real-time
"""
def __init__(
self,
rt_client: _StreamingClient,
plot_length: float,
tribe: RealTimeTribe,
inventory: Inventory,
detections: list,
update_interval: float = 100.,
plot_height: int = 800,
plot_width: int = 1500,
exclude_channels: iter = (),
offline: bool = False,
new_event_loop: bool = True,
**plot_data_options,
) -> None:
if new_event_loop:
# Set up a new event loop for the plots
asyncio.set_event_loop(asyncio.new_event_loop())
channels = [tr.id for tr in rt_client.stream
if tr.stats.channel not in exclude_channels]
Logger.debug("Plot will contain the following channels: {0}".format(
channels))
self.channels = sorted(channels)
self.tribe = tribe
self.plot_length = plot_length
self.inventory = inventory
self.detections = detections
self.hover = HoverTool(
tooltips=[
("UTCDateTime", "@time{%m/%d %H:%M:%S}"),
("Amplitude", "@data")],
formatters={'time': 'datetime'},
mode='vline')
self.map_hover = HoverTool(
tooltips=[
("Latitude", "@lats"),
("Longitude", "@lons"),
("ID", "@id")])
self.tools = "pan,wheel_zoom,reset"
self.plot_options = {
"width": int(2 * (plot_width / 3)),
"height": int((plot_height - 20) / len(channels)),
"tools": [self.hover], "x_axis_type": "datetime"}
self.map_options = {
"width": int(plot_width / 3), "height": plot_height,
"tools": [self.map_hover, self.tools]}
self.updateValue = True
Logger.info("Initializing plotter")
make_doc = partial(
define_plot, rt_client=rt_client, channels=channels,
tribe=self.tribe, inventory=self.inventory,
detections=self.detections, map_options=self.map_options,
plot_options=self.plot_options, plot_length=self.plot_length,
update_interval=update_interval, offline=offline,
**plot_data_options)
self.apps = {'/RT_EQcorrscan': Application(FunctionHandler(make_doc))}
self.server = Server(self.apps)
self.server.start()
Logger.info("Plotting started")
self.threads = []
[docs] def background_run(self):
""" run the plotting in a daemon thread. """
plotting_thread = threading.Thread(
target=self._bg_run, name="PlottingThread")
plotting_thread.daemon = True
plotting_thread.start()
self.threads.append(plotting_thread)
Logger.info("Started plotting")
def _bg_run(self):
print('Opening Bokeh application on http://localhost:5006/')
self.server.io_loop.add_callback(self.server.show, "/")
self.server.io_loop.start()
[docs] def background_stop(self):
""" Stop the background plotting thread. """
self.server.io_loop.stop()
for thread in self.threads:
thread.join()
[docs]def define_plot(
doc: Document,
rt_client: _StreamingClient,
channels: list,
tribe: RealTimeTribe,
inventory: Inventory,
detections: list,
map_options: dict,
plot_options: dict,
plot_length: float,
update_interval: int,
data_color: str = "grey",
lowcut: float = 1.0,
highcut: float = 10.0,
offline: bool = False,
):
"""
Set up a bokeh plot for real-time plotting.
Defines a moving data stream and a map.
Parameters
----------
doc
Bokeh document to edit - usually called as a partial
rt_client
RealTimeClient streaming data
channels
Channels to plot
tribe
Tribe to plot
inventory
Inventory to plot
detections
Detections to plot - should be a list that is updated in place.
map_options
Dictionary of options for the map
plot_options
Dictionary of options for plotting in general
plot_length
Length of data plot
update_interval
Update frequency in seconds
data_color
Colour to data stream
lowcut
Lowcut for filtering data stream
highcut
Highcut for filtering data stream
offline
Flag to set time-stamps to data time-stamps if True, else timestamps
will be real-time
"""
# Set up the data source
Logger.info("Getting stream to define plot")
stream = rt_client.stream.copy().split().detrend()
if lowcut and highcut:
stream.filter("bandpass", freqmin=lowcut, freqmax=highcut)
title = "Streaming data: {0}-{1} Hz bandpass".format(lowcut, highcut)
elif lowcut:
stream.filter("highpass", lowcut)
title = "Streaming data: {0} Hz highpass".format(lowcut)
elif highcut:
stream.filter("lowpass", highcut)
title = "Streaming data: {0} Hz lowpass".format(highcut)
else:
title = "Raw streaming data"
stream.merge()
Logger.info(f"Have the stream: \n{stream}")
template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
for template in tribe:
try:
origin = (template.event.preferred_origin() or
template.event.origins[0])
except IndexError:
continue
template_lats.append(origin.latitude)
template_lons.append(origin.longitude % 360)
template_alphas.append(0)
template_ids.append(template.event.resource_id.id.split("/")[-1])
station_lats, station_lons, station_ids = ([], [], [])
for network in inventory:
for station in network:
station_lats.append(station.latitude)
station_lons.append(station.longitude % 360)
station_ids.append(station.code)
# Get plot bounds in web mercator
Logger.info("Defining map")
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
try:
min_lat, min_lon, max_lat, max_lon = (
min(template_lats + station_lats),
min(template_lons + station_lons),
max(template_lats + station_lats),
max(template_lons + station_lons))
except ValueError as e:
Logger.error(e)
Logger.info("Setting map bounds to NZ")
min_lat, min_lon, max_lat, max_lon = (-47., 165., -34., 179.9)
Logger.info(f"Map bounds: {min_lon}, {min_lat} - {max_lon}, {max_lat}")
bottom_left = transformer.transform(min_lon, min_lat)
top_right = transformer.transform(max_lon, max_lat)
map_x_range = (bottom_left[0], top_right[0])
map_y_range = (bottom_left[1], top_right[1])
template_x, template_y = ([], [])
for lon, lat in zip(template_lons, template_lats):
_x, _y = transformer.transform(lon, lat)
template_x.append(_x)
template_y.append(_y)
station_x, station_y = ([], [])
for lon, lat in zip(station_lons, station_lats):
_x, _y = transformer.transform(lon, lat)
station_x.append(_x)
station_y.append(_y)
template_source = ColumnDataSource({
'y': template_y, 'x': template_x,
'lats': template_lats, 'lons': template_lons,
'template_alphas': template_alphas, 'id': template_ids})
station_source = ColumnDataSource({
'y': station_y, 'x': station_x,
'lats': station_lats, 'lons': station_lons, 'id': station_ids})
Logger.info("Allocated data sources")
trace_sources = {}
trace_data_range = {}
# Allocate empty arrays
for channel in channels:
tr = stream.select(id=channel)[0]
times = np.arange(
tr.stats.starttime.datetime,
(tr.stats.endtime + tr.stats.delta).datetime,
step=dt.timedelta(seconds=tr.stats.delta))
data = tr.data
trace_sources.update(
{channel: ColumnDataSource({'time': times, 'data': data})})
trace_data_range.update({channel: (data.min(), data.max())})
# Set up the map to go on the left side
Logger.info("Adding features to map")
map_plot = figure(
title="Template map", x_range=map_x_range, y_range=map_y_range,
x_axis_type="mercator", y_axis_type="mercator", **map_options)
url = 'http://a.basemaps.cartocdn.com/rastertiles/voyager/{Z}/{X}/{Y}.png'
attribution = "Tiles by Carto, under CC BY 3.0. Data by OSM, under ODbL"
map_plot.add_tile(WMTSTileSource(url=url, attribution=attribution))
map_plot.circle(
x="x", y="y", source=template_source, fill_color="firebrick",
line_color="grey", line_alpha=.2,
fill_alpha="template_alphas", size=10)
map_plot.triangle(
x="x", y="y", size=10, source=station_source, color="blue", alpha=1.0)
# Set up the trace plots
Logger.info("Setting up streaming plot")
trace_plots = []
if not offline:
now = dt.datetime.utcnow()
else:
now = max([tr.stats.endtime for tr in stream]).datetime
p1 = figure(
y_axis_location="right", title=title,
x_range=[now - dt.timedelta(seconds=plot_length), now],
height=int(plot_options["height"] * 1.2),
**{key: value for key, value in plot_options.items()
if key != "height"})
p1.yaxis.axis_label = None
p1.xaxis.axis_label = None
p1.min_border_bottom = 0
p1.min_border_top = 0
if len(channels) != 1:
p1.xaxis.major_label_text_font_size = '0pt'
p1_line = p1.line(
x="time", y='data', source=trace_sources[channels[0]],
color=data_color, line_width=1)
legend = Legend(items=[(channels[0], [p1_line])])
p1.add_layout(legend, 'right')
datetick_formatter = DatetimeTickFormatter(
days=["%m/%d"], months=["%m/%d"],
hours=["%m/%d %H:%M:%S"], minutes=["%m/%d %H:%M:%S"],
seconds=["%m/%d %H:%M:%S"], hourmin=["%m/%d %H:%M:%S"],
minsec=["%m/%d %H:%M:%S"])
p1.xaxis.formatter = datetick_formatter
# Add detection lines
Logger.info("Adding detection artists")
detection_source = _get_pick_times(detections, channels[0])
detection_source.update(
{"pick_values": [[
int(min(stream.select(id=channels[0])[0].data) * .9),
int(max(stream.select(id=channels[0])[0].data) * .9)]
for _ in detection_source['picks']]})
detection_sources = {channels[0]: ColumnDataSource(detection_source)}
detection_lines = MultiLine(
xs="picks", ys="pick_values", line_color="red", line_dash="dashed",
line_width=1)
p1.add_glyph(detection_sources[channels[0]], detection_lines)
trace_plots.append(p1)
if len(channels) > 1:
for i, channel in enumerate(channels[1:]):
p = figure(
x_range=p1.x_range,
y_axis_location="right", **plot_options)
p.yaxis.axis_label = None
p.xaxis.axis_label = None
p.min_border_bottom = 0
# p.min_border_top = 0
p_line = p.line(
x="time", y="data", source=trace_sources[channel],
color=data_color, line_width=1)
legend = Legend(items=[(channel, [p_line])])
p.add_layout(legend, 'right')
p.xaxis.formatter = datetick_formatter
# Add detection lines
detection_source = _get_pick_times(detections, channel)
detection_source.update(
{"pick_values": [[
int(min(stream.select(id=channel)[0].data) * .9),
int(max(stream.select(id=channel)[0].data) * .9)]
for _ in detection_source['picks']]})
detection_sources.update({
channel: ColumnDataSource(detection_source)})
detection_lines = MultiLine(
xs="picks", ys="pick_values", line_color="red",
line_dash="dashed", line_width=1)
p.add_glyph(detection_sources[channel], detection_lines)
trace_plots.append(p)
if i != len(channels) - 2:
p.xaxis.major_label_text_font_size = '0pt'
plots = gridplot([[map_plot, column(trace_plots)]])
previous_timestamps = {
channel: stream.select(id=channel)[0].stats.endtime
for channel in channels}
def update():
Logger.debug("Plot updating")
_stream = rt_client.stream.split().detrend()
if lowcut and highcut:
_stream.filter("bandpass", freqmin=lowcut, freqmax=highcut)
elif lowcut:
_stream.filter("highpass", lowcut)
elif highcut:
_stream.filter("lowpass", highcut)
_stream.merge()
for _i, _channel in enumerate(channels):
try:
_tr = _stream.select(id=_channel)[0]
except IndexError:
Logger.debug("No channel for {0}".format(_channel))
continue
new_samples = int(_tr.stats.sampling_rate * (
previous_timestamps[_channel] - _tr.stats.endtime))
if new_samples == 0:
Logger.debug("No new data for {0}".format(_channel))
continue
_new_data = _tr.slice(
starttime=previous_timestamps[_channel])
new_times = np.arange(
_new_data.stats.starttime.datetime,
(_tr.stats.endtime + _tr.stats.delta).datetime,
step=dt.timedelta(seconds=_tr.stats.delta))
new_data = {'time': new_times[1:], 'data': _new_data.data[1:]}
Logger.debug("Channl: {0}\tNew times: {1}\t New data: {2}".format(
_tr.id, new_data["time"].shape, new_data["data"].shape))
trace_sources[_channel].stream(
new_data=new_data,
rollover=int(plot_length * _tr.stats.sampling_rate))
new_picks = _get_pick_times(detections, _channel)
new_picks.update({
'pick_values': [
[int(np.nan_to_num(
trace_sources[_channel].data['data']).max() * .9),
int(np.nan_to_num(
trace_sources[_channel].data['data']).min() * .9)]
for _ in new_picks['picks']]})
detection_sources[_channel].data = new_picks
previous_timestamps.update({_channel: _tr.stats.endtime})
Logger.debug("New data plotted for {0}".format(_channel))
if not offline:
now = dt.datetime.utcnow()
else:
try:
now = max([tr.stats.endtime for tr in _stream]).datetime
except ValueError:
return
trace_plots[0].x_range.start = now - dt.timedelta(seconds=plot_length)
trace_plots[0].x_range.end = now
_update_template_alphas(
detections, tribe, decay=plot_length, now=now,
datastream=template_source)
Logger.info("Adding callback")
doc.add_periodic_callback(update, update_interval)
doc.title = "EQcorrscan Real-time plotter"
doc.add_root(plots)
Logger.info("Plot defined")
def _update_template_alphas(
detections: list,
tribe: RealTimeTribe,
decay: float,
now, datastream) -> None:
"""
Update the template location datastream.
Parameters
----------
detections
Detections to use to update the datastream
tribe
Templates used
decay
Colour decay length in seconds
now
Reference time-stamp
datastream
Data stream to update
"""
transformer = Transformer.from_crs(
"epsg:4326", "epsg:3857", always_xy=True)
template_lats, template_lons, template_alphas, template_ids = (
[], [], [], [])
template_x, template_y = ([], [])
for template in tribe:
try:
origin = (template.event.preferred_origin() or
template.event.origins[0])
except IndexError:
continue
template_lats.append(origin.latitude)
template_lons.append(origin.longitude)
template_ids.append(template.event.resource_id.id.split("/")[-1])
_x, _y = transformer.transform(origin.longitude, origin.latitude)
template_x.append(_x)
template_y.append(_y)
template_detections = [
d for d in detections if d.template_name == template.name]
if len(template_detections) == 0:
template_alphas.append(0)
else:
detect_time = min([d.detect_time for d in template_detections])
offset = (now - detect_time.datetime).total_seconds()
alpha = 1. - (offset / decay)
Logger.debug('Updating alpha to {0:.4f}'.format(alpha))
template_alphas.append(alpha)
datastream.data = {'y': template_y, 'x': template_x, 'lats': template_lats,
'lons': template_lons,
'template_alphas': template_alphas, 'id': template_ids}
return
def _get_pick_times(
detections: list,
seed_id: str,
ignore_channel: bool = True
) -> dict:
"""
Get new pick times from catalog for a given channel.
Parameters
----------
detections
List of detections
seed_id
The full Seed-id (net.sta.loc.chan) for extract picks for
ignore_channel
Whether to return all picks for a given sensor (e.g. HH*)
Returns
-------
Dictionary with one key ("picks") of the pick-times.
"""
picks = []
Logger.debug("Scanning {0} detections for new picks".format(
len(detections)))
net, sta, loc, chan = seed_id.split('.')
for detection in detections:
try:
if ignore_channel:
pick = [p for p in detection.event.picks
if p.waveform_id.network_code == net and
p.waveform_id.station_code == sta and
p.waveform_id.location_code == loc][0]
else:
pick = [p for p in detection.event.picks
if p.waveform_id.get_seed_string() == seed_id][0]
except IndexError:
pick = None
pass
if pick:
Logger.debug("Plotting pick on {0} at {1}".format(
seed_id, pick.time))
picks.append([pick.time.datetime, pick.time.datetime])
return {"picks": picks}
if __name__ == "__main__":
import doctest
doctest.testmod()