Computing the sum and difference of 2 streams

Hi Team,
My data set consists of 3 timeseries/stream of tables.

signal_name (Tag) last (Field)
prediction + prediction_error double
prediction - prediction_error double
fair_value double

The cycle_id correlates a prediction data point to its corresponding prediction_error. My assignment is to transform this stream to the following (for Grafana visualization)

signal_name (Tag) last (Field)
prediction + prediction_error double
prediction - prediction_error double
fair_value double

Here, I sum a prediction and its corresponding prediction_error (with same cycle_id) to create a new stream and I compute the difference of prediction and its corresponding prediction_error (with same cycle_id) to form a new stream. The fair_value stream remains unchanged.

I have a rough idea on how to perform this transform but I am struggling with the precise syntax and the optimal solution. What follows is mostly pseudo code especially towards the end of code snippet.

//Lets gets the earliest time per cycle_id
baseData = 
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["signal_name"]  == "fv_stats_" or r["signal_name"]  == "prediction_stats_" or r["signal_name"]  == "prediction_error_exp_stats_")
		
getEarliest = (cycle_id) => {
    earliestTimes =
        baseData
            |> group(columns: ["cycle_id"])
            |> first()
            |> keep(columns: ["_time", "cycle_id"])

    cycleTime = 
        earliestTimes
            |> findRecord(fn: (key) => key.cycle_id == cycle_id, idx: 0)

    return cycleTime._time
}

//Only the fair value	
fairValue = () =>
	from(bucket: "bkt")
		|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
		|> filter(fn: (r) => r["_measurement"] == "signals")
		|> filter(fn: (r) => r["signal_name"]  == "fv_stats_")

//Only the prediction	
prediction = () =>
	from(bucket: "bkt")
		|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
		|> filter(fn: (r) => r["_measurement"] == "signals")
		|> filter(fn: (r) => r["signal_name"]  == "prediction_stats_")

//Only the prediction error	
predictionError = () =>
	from(bucket: "bkt")
		|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
		|> filter(fn: (r) => r["_measurement"] == "signals")
		|> filter(fn: (r) => r["signal_name"]  == "prediction_error_exp_stats_")

//Now try to join on cycle id (Wonder if pivot is better)		
output = join(tables: {fv: fairValue, prediction: prediction, perror: predictionError}, on: ["cycle_id"]) 
              //Retain the fairvalue after map
	      |> map(fn: (r) => ({_time: getEarliest(cycle_id: r.cycle_id), sum: expr1, difference: exp2},))

I am however struggling with the following

  1. At the point where I do a join, each of my streams (fairvalue, prediction and predictionError) stores the value in a field called “last”. How does joining work when their are duplicate columns? What is the best way to achieve this?
  2. The last line in the code snippet has added expr1 and expr2 as place holders as I am unable to figure out the correct syntax. For expr1, I want to sum the value in the last field for the prediction and the value in the last field prediction_error in same cycle. expr2 is similar but I want the difference rather than sum.

@ValentineO I think union() |> pivot() |> map() is the way to go here:

// ... The rest of your query

//Only the fair value
fairValue = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "fv_stats_")

//Only the prediction
prediction = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "prediction_stats_")

//Only the prediction error
predictionError = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "prediction_error_exp_stats_")

union(tables: [fairValue(), prediction(), predictionError()])
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(
        fn: (r) =>
            ({
                _time: getEarliest(cycle_id: r.cycle_id),
                cycle_id: r.cycle_id,
                sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
                difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
                fair_value: r.fv_stats_,
            }),
    )

Hi @scott,
Thanks for the reply. This looks like a feasible solution although I had one (hopefully last) query.
My data looks like this

_time _measurement signal_name (tag) _field _value
t0 signals fv_stats_ last 100.34
t0 signals fv_stats_ cycle_id 1
t1 signals prediction_stats_ last 4.5
t1 signals prediction_stats_ cycle_id 1
t2 signals prediction_error_exp_stats_ last 0.25
t2 signals prediction_error_exp_stats_ cycle_id 1

So for a given cycle_id (and I have shown for cycle_id 1 above), we will be performing the addition and subtraction on the _field named last.

    |> map(
        fn: (r) =>
            ({
                _time: getEarliest(cycle_id: r.cycle_id),
                cycle_id: r.cycle_id,
                sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
                difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
                fair_value: r.fv_stats_,
            }),

In the snippet above, I suspect this works if prediction_stats_, prediction_error_exp_stats_ and fv_stats_ are fields but they are actually tags. the sum and difference would need to refer to the _field named last and that was my biggest challenge.

Ok, due to the structure of the data, this gets a little complicated, but I think the following will give you what you want:

import "internal/debug"

//Lets gets the earliest time per cycle_id
baseData = 
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["signal_name"]  == "fv_stats_" or r["signal_name"]  == "prediction_stats_" or r["signal_name"]  == "prediction_error_exp_stats_")
		
getEarliest = (cycle_id) => {
    earliestTimes =
        baseData
            |> group(columns: ["cycle_id"])
            |> first()
            |> keep(columns: ["_time", "cycle_id"])

    cycleTime = 
        earliestTimes
            |> findRecord(fn: (key) => key.cycle_id == cycle_id, idx: 0)

    return cycleTime._time
}

//Only the fair value
fairValue = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "fv_stats_")

//Only the prediction
prediction = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "prediction_stats_")

//Only the prediction error
predictionError = () =>
    from(bucket: "bkt")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "signals")
        |> filter(fn: (r) => r["signal_name"] == "prediction_error_exp_stats_")

union(tables: [fairValue(), prediction(), predictionError()])
    |> pivot(rowKey: ["_time"], columnKey: ["signal_name", "_field"], valueColumn: "_value")
    |> map(
        fn: (r) => {
            cycle_id =
                if exists r.fv_stats__cycle_id then
                    r.fv_stats__cycle_id
                else if exists r.prediction_stats__cycle_id then
                    r.prediction_stats__cycle_id
                else if exists r.prediction_error_exp_stats__cycle_id then
                    r.prediction_error_exp_stats__cycle_id
                else
                    0.0

            return {
                _time: r._time,
                cycle_id: cycle_id,
                fv_stats_: r.fv_stats__last,
                prediction_stats_: r.prediction_stats__last,
                prediction_error_exp_stats_: r.prediction_error_exp_stats__last,
            }
        },
    )
    |> group(columns: ["cycle_id"])
    |> reduce(
        identity: {fv_stats_: debug.null(), prediction_error_exp_stats_: debug.null(), prediction_stats_: debug.null()},
        fn: (r, accumulator) =>
            ({
                fv_stats_: if exists r.fv_stats_ then r.fv_stats_ else accumulator.fv_stats_,
                prediction_error_exp_stats_:
                    if exists r.prediction_error_exp_stats_ then
                        r.prediction_error_exp_stats_
                    else
                        accumulator.prediction_error_exp_stats_,
                prediction_stats_:
                    if exists r.prediction_stats_ then
                        r.prediction_stats_
                    else
                        accumulator.prediction_stats_,
            }),
    )
    |> map(
        fn: (r) =>
            ({
                _time: getEarliest(cycle_id: r.cycle_id),
                sum: r.prediction_stats_ + r.prediction_error_exp_stats_,
                difference: r.prediction_stats_ - r.prediction_error_exp_stats_,
                fair_value: r.fv_stats_,
            }),
    )

Hi @scott,
This looks feasible, many thanks. On a separate note, do you think the current structure of the data can be improved? I am open to ideas as this is my first foray into InfluxDB and I will not be surprised if there are better/more optimal ways to achieve the data model.

I’m just basing these recommendations off the data structure you included above:

_time _measurement signal_name (tag) _field _value
t0 signals fv_stats_ last 100.34
t0 signals fv_stats_ cycle_id 1
t1 signals prediction_stats_ last 4.5
t1 signals prediction_stats_ cycle_id 1
t2 signals prediction_error_exp_stats_ last 0.25
t2 signals prediction_error_exp_stats_ cycle_id 1

I would suggest adding cycle_id as a tag and storing each of the signal names as fields. So the new structure would look like this:

_time _measurement cycle_id _field _value
t0 signals 1 fv_stats_ 100.34
t1 signals 1 prediction_stats_ 4.5
t2 signals 1 prediction_error_exp_stats_ 0.25

Hi @scott, cycle_id as a tag makes sense. Alas, each signal data point has multiple properties such as min, max, last and valid which change with respect to time and and I made them them fields. This is what the line protocol looks like for a single data point (cycle_id omitted ).

signals,signal_name=fv_stats_,strategy=gemini-eurex min=123.09,max=140.9,last=139.0,valid=T 1716395066129941982

@ValentineO Ah, ok, that schema makes sense. Just structuring the cycle_id as a tag instead of a field would remove a lot of the query complexity.