Streaming data to Jupyter in real-time

Hi all,

I’ve been trying to follow this guide, but have run into some issues and could really use some help.

The code is essentially the same, except for some minor changes to accomodate my DB.

from datetime import timedelta
from typing import List

import hvplot.streamz
import pandas as pd
import reactivex as rx
from reactivex import operators as ops

from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient

from bokeh.models.formatters import DatetimeTickFormatter

def source_data(auto_refresh: int, query: str, sink: Stream):
    rx \
        .interval(period=timedelta(seconds=auto_refresh)) \
        .pipe(ops.map(lambda start: f'from(bucket: "test") '
                                    f'|> range(start: -{auto_refresh}s, stop: now()) '
                                    f'{query}')) \
        .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
        .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
        .subscribe(on_next=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
    pass

token = "token"
org = "org"

client = InfluxDBClient(url='http://localhost:8086', token=token, org=org)

load_query = '|> filter(fn: (r) => r._measurement == "system") ' \
            '|> filter(fn: (r) => r._field == "load1" or r._field == "load5" or r._field == "load15") ' \
            '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
            '|> keep(columns: ["_time", "load1", "load5", "load15"])'

load_sink = Stream()
load_example = pd.DataFrame({'load1': [], 'load5': [], 'load15': []}, columns=['load1', 'load5', 'load15'])
load_df = DataFrame(load_sink, example=load_example)

source_data(auto_refresh=5, sink=load_sink, query=load_query)

# Time formatter
formatter = DatetimeTickFormatter(
    microseconds = "%H:%M:%S",
    milliseconds = "%H:%M:%S",
    seconds = "%H:%M:%S",
    minsec = "%H:%M:%S",
    minutes = "%H:%M:%S",
    hourmin = "%H:%M:%S",
    hours="%H:%M:%S",
    days="%H:%M:%S",
    months="%H:%M:%S",
    years="%H:%M:%S",
)

load_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter) 

I have Telegraf running and wish to graph ‘load1’, ‘load5’ and ‘load15’.

Issues

  • .pipe(ops.map(lambda data_frame: data_frame.drop(columns=[‘result’, ‘table’]))) \ never recognises any columns to drop > gives error “[table]” not found in axis
  • No data points from load variables show

My guess is that I’m not sourcing the data correctly somehow? Any help is appreciated, thanks!

Hello @bella,
Are you able to successfully query influxdb with the python client and return a DataFrame?
Outside of this script?

Heres a basic write and query example:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

write_api.write(bucket=bucket, record=p)

## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    print(table)
    for row in table.records:
        print (row.values)


## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
    for cell in row:
        val_count += 1