Hi all,
I’ve been trying to follow this guide, but have run into some issues and could really use some help.
The code is essentially the same, except for some minor changes to accomodate my DB.
from datetime import timedelta
from typing import List
import hvplot.streamz
import pandas as pd
import reactivex as rx
from reactivex import operators as ops
from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient
from bokeh.models.formatters import DatetimeTickFormatter
def source_data(auto_refresh: int, query: str, sink: Stream):
rx \
.interval(period=timedelta(seconds=auto_refresh)) \
.pipe(ops.map(lambda start: f'from(bucket: "test") '
f'|> range(start: -{auto_refresh}s, stop: now()) '
f'{query}')) \
.pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
.pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
.subscribe(on_next=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
pass
token = "token"
org = "org"
client = InfluxDBClient(url='http://localhost:8086', token=token, org=org)
load_query = '|> filter(fn: (r) => r._measurement == "system") ' \
'|> filter(fn: (r) => r._field == "load1" or r._field == "load5" or r._field == "load15") ' \
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
'|> keep(columns: ["_time", "load1", "load5", "load15"])'
load_sink = Stream()
load_example = pd.DataFrame({'load1': [], 'load5': [], 'load15': []}, columns=['load1', 'load5', 'load15'])
load_df = DataFrame(load_sink, example=load_example)
source_data(auto_refresh=5, sink=load_sink, query=load_query)
# Time formatter
formatter = DatetimeTickFormatter(
microseconds = "%H:%M:%S",
milliseconds = "%H:%M:%S",
seconds = "%H:%M:%S",
minsec = "%H:%M:%S",
minutes = "%H:%M:%S",
hourmin = "%H:%M:%S",
hours="%H:%M:%S",
days="%H:%M:%S",
months="%H:%M:%S",
years="%H:%M:%S",
)
load_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)
I have Telegraf running and wish to graph ‘load1’, ‘load5’ and ‘load15’.
Issues
- .pipe(ops.map(lambda data_frame: data_frame.drop(columns=[‘result’, ‘table’]))) \ never recognises any columns to drop > gives error “[table]” not found in axis
- No data points from load variables show
My guess is that I’m not sourcing the data correctly somehow? Any help is appreciated, thanks!