Hi Team,
My data set consists of 3 timeseries/stream of tables.
signal_name (Tag)
last (Field)
prediction + prediction_error
double
prediction - prediction_error
double
fair_value
double
The cycle_id correlates a prediction data point to its corresponding prediction_error . My assignment is to transform this stream to the following (for Grafana visualization)
signal_name (Tag)
last (Field)
prediction + prediction_error
double
prediction - prediction_error
double
fair_value
double
Here, I sum a prediction and its corresponding prediction_error (with same cycle_id) to create a new stream and I compute the difference of prediction and its corresponding prediction_error (with same cycle_id) to form a new stream. The fair_value stream remains unchanged.
I have a rough idea on how to perform this transform but I am struggling with the precise syntax and the optimal solution. What follows is mostly pseudo code especially towards the end of code snippet.
//Lets gets the earliest time per cycle_id
baseData =
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["signal_name"] == "fv_stats_" or r["signal_name"] == "prediction_stats_" or r["signal_name"] == "prediction_error_exp_stats_")
getEarliest = (cycle_id) => {
earliestTimes =
baseData
|> group(columns: ["cycle_id"])
|> first()
|> keep(columns: ["_time", "cycle_id"])
cycleTime =
earliestTimes
|> findRecord(fn: (key) => key.cycle_id == cycle_id, idx: 0)
return cycleTime._time
}
//Only the fair value
fairValue = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "fv_stats_")
//Only the prediction
prediction = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_stats_")
//Only the prediction error
predictionError = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_error_exp_stats_")
//Now try to join on cycle id (Wonder if pivot is better)
output = join(tables: {fv: fairValue, prediction: prediction, perror: predictionError}, on: ["cycle_id"])
//Retain the fairvalue after map
|> map(fn: (r) => ({_time: getEarliest(cycle_id: r.cycle_id), sum: expr1, difference: exp2},))
I am however struggling with the following
At the point where I do a join, each of my streams (fairvalue, prediction and predictionError) stores the value in a field called “last”. How does joining work when their are duplicate columns? What is the best way to achieve this?
The last line in the code snippet has added expr1 and expr2 as place holders as I am unable to figure out the correct syntax. For expr1 , I want to sum the value in the last field for the prediction and the value in the last field prediction_error in same cycle. expr2 is similar but I want the difference rather than sum.
scott
May 21, 2024, 2:57pm
2
@ValentineO I think union() |> pivot() |> map() is the way to go here:
// ... The rest of your query
//Only the fair value
fairValue = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "fv_stats_")
//Only the prediction
prediction = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_stats_")
//Only the prediction error
predictionError = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_error_exp_stats_")
union(tables: [fairValue(), prediction(), predictionError()])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(
fn: (r) =>
({
_time: getEarliest(cycle_id: r.cycle_id),
cycle_id: r.cycle_id,
sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
fair_value: r.fv_stats_,
}),
)
Hi @scott ,
Thanks for the reply. This looks like a feasible solution although I had one (hopefully last) query.
My data looks like this
_time
_measurement
signal_name (tag)
_field
_value
t0
signals
fv_stats_
last
100.34
t0
signals
fv_stats_
cycle_id
1
t1
signals
prediction_stats_
last
4.5
t1
signals
prediction_stats_
cycle_id
1
t2
signals
prediction_error_exp_stats_
last
0.25
t2
signals
prediction_error_exp_stats_
cycle_id
1
So for a given cycle_id (and I have shown for cycle_id 1 above), we will be performing the addition and subtraction on the _field named last .
|> map(
fn: (r) =>
({
_time: getEarliest(cycle_id: r.cycle_id),
cycle_id: r.cycle_id,
sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
fair_value: r.fv_stats_,
}),
In the snippet above, I suspect this works if prediction_stats_ , prediction_error_exp_stats_ and fv_stats_ are fields but they are actually tags . the sum and difference would need to refer to the _field named last and that was my biggest challenge.
scott
May 21, 2024, 8:55pm
4
Ok, due to the structure of the data, this gets a little complicated, but I think the following will give you what you want:
import "internal/debug"
//Lets gets the earliest time per cycle_id
baseData =
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["signal_name"] == "fv_stats_" or r["signal_name"] == "prediction_stats_" or r["signal_name"] == "prediction_error_exp_stats_")
getEarliest = (cycle_id) => {
earliestTimes =
baseData
|> group(columns: ["cycle_id"])
|> first()
|> keep(columns: ["_time", "cycle_id"])
cycleTime =
earliestTimes
|> findRecord(fn: (key) => key.cycle_id == cycle_id, idx: 0)
return cycleTime._time
}
//Only the fair value
fairValue = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "fv_stats_")
//Only the prediction
prediction = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_stats_")
//Only the prediction error
predictionError = () =>
from(bucket: "bkt")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "signals")
|> filter(fn: (r) => r["signal_name"] == "prediction_error_exp_stats_")
union(tables: [fairValue(), prediction(), predictionError()])
|> pivot(rowKey: ["_time"], columnKey: ["signal_name", "_field"], valueColumn: "_value")
|> map(
fn: (r) => {
cycle_id =
if exists r.fv_stats__cycle_id then
r.fv_stats__cycle_id
else if exists r.prediction_stats__cycle_id then
r.prediction_stats__cycle_id
else if exists r.prediction_error_exp_stats__cycle_id then
r.prediction_error_exp_stats__cycle_id
else
0.0
return {
_time: r._time,
cycle_id: cycle_id,
fv_stats_: r.fv_stats__last,
prediction_stats_: r.prediction_stats__last,
prediction_error_exp_stats_: r.prediction_error_exp_stats__last,
}
},
)
|> group(columns: ["cycle_id"])
|> reduce(
identity: {fv_stats_: debug.null(), prediction_error_exp_stats_: debug.null(), prediction_stats_: debug.null()},
fn: (r, accumulator) =>
({
fv_stats_: if exists r.fv_stats_ then r.fv_stats_ else accumulator.fv_stats_,
prediction_error_exp_stats_:
if exists r.prediction_error_exp_stats_ then
r.prediction_error_exp_stats_
else
accumulator.prediction_error_exp_stats_,
prediction_stats_:
if exists r.prediction_stats_ then
r.prediction_stats_
else
accumulator.prediction_stats_,
}),
)
|> map(
fn: (r) =>
({
_time: getEarliest(cycle_id: r.cycle_id),
sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
fair_value: r.fv_stats_,
}),
)
Hi @scott ,
This looks feasible, many thanks. On a separate note, do you think the current structure of the data can be improved? I am open to ideas as this is my first foray into InfluxDB and I will not be surprised if there are better/more optimal ways to achieve the data model.
scott
May 22, 2024, 2:18pm
6
I’m just basing these recommendations off the data structure you included above:
_time
_measurement
signal_name (tag)
_field
_value
t0
signals
fv_stats_
last
100.34
t0
signals
fv_stats_
cycle_id
1
t1
signals
prediction_stats_
last
4.5
t1
signals
prediction_stats_
cycle_id
1
t2
signals
prediction_error_exp_stats_
last
0.25
t2
signals
prediction_error_exp_stats_
cycle_id
1
I would suggest adding cycle_id as a tag and storing each of the signal names as fields. So the new structure would look like this:
_time
_measurement
cycle_id
_field
_value
t0
signals
1
fv_stats_
100.34
t1
signals
1
prediction_stats_
4.5
t2
signals
1
prediction_error_exp_stats_
0.25
Hi @scott , cycle_id as a tag makes sense. Alas, each signal data point has multiple properties such as min, max, last and valid which change with respect to time and and I made them them fields. This is what the line protocol looks like for a single data point (cycle_id omitted ).
signals,signal_name=fv_stats_,strategy=gemini-eurex min=123.09,max=140.9,last=139.0,valid=T 1716395066129941982
scott
May 22, 2024, 7:51pm
8
@ValentineO Ah, ok, that schema makes sense. Just structuring the cycle_id as a tag instead of a field would remove a lot of the query complexity.