Make difference between the duration of an order in a process with the mean time off all orders of the last 12 months

Hey,
this is a complicated one but i hope you can help me.
I have 2 different measurements
In the first one is all the data which is important for an order like articlenumber and order-ID.
In the second one i get the live data from a process where i can extract how long an order was within each processing step.
In all the processing steps are between 2 and 20 machines where the order can go through. Sometimes the order gets processed by 2 machines at the step at the same time.

What i want to achive is to get the duration of how long each order ID was within a processing step.
Then i want to make the difference between this time and the 12 months mean time of the corresponding articlenumber. I was able to get this far with 2 queries which i combined with the “transform” functions of Grafana. The problem is that the calculating time is way to long.
Is there a way to achive this in maybe one single query?

My working queries look like this:

import "contrib/tomhollingworth/events"
tab1 = from(bucket: "Bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "OrderData" or
    r.short == "quantity" or
    r.short == "number" or
    r.short == "ID"
  )
  |>keep(columns: ["_field", "_time","short","_value"])
  |> pivot(
    rowKey:["_time"],
    columnKey: ["short","_field"],
    valueColumn: "_value"
  )
  |> map(fn: (r) => ({
      _time: r._time,
      ID: string(v: r.ID_value),
      Bestellmenge: r.quantity_value,
      Artikelnummer: string(v: r.number_value_str)
  }))
tab2 = from(bucket: "Bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "LiveData" and
r.short == "IDmachine1step1"
)
|>keep(columns: ["_field", "_time","short","_value","_stop"])
|> pivot(
rowKey:["_time"],
columnKey: ["short","_field"],
valueColumn: "_value"
)
|> map(fn: (r) => ({
      _time: r._time,
      _stop:r._stop,
      ID: string(v: r.IDmachine1step1_value_str),
  }))
|>events.duration(
    unit:1m,
    columnName: "M1",
    timeColumn: "_time",
    stopColumn: "_stop"
)
|>drop(columns: ["_time","_stop"])
|>group(columns:["ID"])
|>sum(column:"M1")
|>group()

tab3 = from(bucket: "Bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "LiveData" and
r.short == "IDmachine2step1"
)
|>keep(columns: ["_field", "_time","short","_value","_stop"])
|> pivot(
rowKey:["_time"],
columnKey: ["short","_field"],
valueColumn: "_value"
)
|> map(fn: (r) => ({
      _time: r._time,
      _stop:r._stop,
      ID: string(v: r.IDmachine2step1_value_str),
  }))
|>events.duration(
    unit:1m,
    columnName: "M2",
    timeColumn: "_time",
    stopColumn: "_stop"
)
|>drop(columns: ["_time","_stop"])
|>group(columns:["ID"])
|>sum(column:"M2")
|>group()
tab4=union(
  tables: [tab2,tab3]
)
join(
  tables: {t1: tab1, t2: tab4},
  on: ["ID"]
)
|>fill(column: "M1", value: 0)
|>fill(column: "M2", value: 0)
|> map(fn: (r) => ({
      "time per quantity": (float(v: r.M1)+float(v: r.M2))/(r.quantity/1000.0),
      number: r.number,
      ID: r.ID
  }))
import "contrib/tomhollingworth/events"
tab1 = from(bucket: "Bucket")
  |> range(start: -365d)
  |> filter(fn: (r) =>
    r._measurement == "OrderData" or
    r.short == "quantity" or
    r.short == "number" or
    r.short == "ID"
  )
  |>keep(columns: ["_field", "_time","short","_value"])
  |> pivot(
    rowKey:["_time"],
    columnKey: ["short","_field"],
    valueColumn: "_value"
  )
  |> map(fn: (r) => ({
      _time: r._time,
      ID: string(v: r.ID_value),
      Bestellmenge: r.quantity_value,
      Artikelnummer: string(v: r.number_value_str)
  }))
tab2 = from(bucket: "Bucket")
|> range(start: -365d)
|> filter(fn: (r) =>
r._measurement == "LiveData" and
r.short == "IDmachine1step1"
)
|>keep(columns: ["_field", "_time","short","_value","_stop"])
|> pivot(
rowKey:["_time"],
columnKey: ["short","_field"],
valueColumn: "_value"
)
|> map(fn: (r) => ({
      _time: r._time,
      _stop:r._stop,
      ID: string(v: r.IDmachine1step1_value_str),
  }))
|>events.duration(
    unit:1m,
    columnName: "M1",
    timeColumn: "_time",
    stopColumn: "_stop"
)
|>drop(columns: ["_time","_stop"])
|>group(columns:["ID"])
|>sum(column:"M1")
|>group()

tab3 = from(bucket: "Bucket")
|> range(start: -365d)
|> filter(fn: (r) =>
r._measurement == "LiveData" and
r.short == "IDmachine2step1"
)
|>keep(columns: ["_field", "_time","short","_value","_stop"])
|> pivot(
rowKey:["_time"],
columnKey: ["short","_field"],
valueColumn: "_value"
)
|> map(fn: (r) => ({
      _time: r._time,
      _stop:r._stop,
      ID: string(v: r.IDmachine2step1_value_str),
  }))
|>events.duration(
    unit:1m,
    columnName: "M2",
    timeColumn: "_time",
    stopColumn: "_stop"
)
|>drop(columns: ["_time","_stop"])
|>group(columns:["ID"])
|>sum(column:"M2")
|>group()
tab4=union(
  tables: [tab2,tab3]
)
join(
  tables: {t1: tab1, t2: tab4},
  on: ["ID"]
)
|>fill(column: "M1", value: 0)
|>fill(column: "M2", value: 0)
|> map(fn: (r) => ({
      "time per quantity 12M": (float(v: r.M1)+float(v: r.M2))/(r.quantity/1000.0),
      number: r.number,
      ID: r.ID
  }))
|>group(columns:["number"])
|>mean(column:"time per quantity 12M")
|>group()

With these two queries i used the transformations “OuterJoin”, “filter data by values” and “Add field from calculation” to get the difference.
I hope somebody can help me with this. Thank you so much in advance.

Hello @Patse,
Honestly it’s pretty difficult for me to help you without some more information. What would be super helpful is if you could export a few lines to annotated csv (maybe 20 points or less) and then share your expected output. Or alternatively if you could create an sample dataset with line protocol and describe your expected output so that I can try to figure out a solution on my end.

Finally, (and I’m not sure of this) is there an opportunity to use reduce? instead of a map or join?
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/aggregates/reduce/