Nested Flux query with variable time range

Hi,

I struggle with the following flux query challenge:

We use InfluxDB v1.8.3 and stores a lot of process data in a single InfluxDB Measurement.
Fore instance we store tank level readings every 1 second and pump start and stop events, typically every 10 minutes or so.

I would like to make a query that finds every pump event (this is not a problem) and then looks up the corresponding tank level for every pump event.
The timestamp of a pump event and timestamp of a level measurement do not match so an average of the level measurements in the range of pump event +/- 3 seconds is fine.

I have succeeded doing this using a join but it’s very slow, so I am hoping for a more efficient way.

A level query will look something like this:

from(bucket: “machines”)
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == “SN000022”)
|> filter(fn: (r) => r.equipment == “T5001”)
|> filter(fn: (r) => r.data == “level”)
|> drop(columns: ["_start", “_stop”, “_field”, “_measurement”, “equipment”, “data” “system”, “unit”])

Output:

          result	table	                                        _time	      _value
0	_result	0	2022-01-14 11:23:54.354000+00:00	77.390625
1	_result	0	2022-01-14 11:23:55.356000+00:00	77.531250
2	_result	0	2022-01-14 11:23:56.357000+00:00	77.656250

A pump event query will look something like this:
Including two new columns with time shifted timestamps

from(bucket:“machines”)
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == “SN000022”)
|> filter(fn: (r) => r.equipment == “P5001”)
|> filter(fn: (r) => r.data == “running”)
|> filter(fn: (r) => r._field == “value”)
|> difference()
|> filter(fn: (r) => r._value != 0)
|> map(fn: (r) => ({{r with t1: r._time}}))
|> map(fn: (r) => ({{r with t2: r._time}}))
|> timeShift(duration: -3s, columns: [“t1”])
|> timeShift(duration: 3s, columns: [“t2”])
|> keep(columns: ["_time", “t1”, “t2”])

Output:

     result	table	                       _time	                              t1	                              t2
0	_result	0	2022-01-14 11:23:58.501000+00:00	2022-01-14 11:23:55.501000+00:00	2022-01-14 11:24:01.501000+00:00
1	_result	0	2022-01-14 11:26:08.067000+00:00	2022-01-14 11:26:05.067000+00:00	2022-01-14 11:26:11.067000+00:00
2	_result	0	2022-01-14 11:26:49.831000+00:00	2022-01-14 11:26:46.831000+00:00	2022-01-14 11:26:52.831000+00:00

So, my dream solution would be to embed the level query as a nested query in the pump event query and use t1 and t2 as start: and stop: parameters in the nested level query.
So far, I have not been able to figure out how to do this…?

Any advice would be appreciated.

BTW, the examples are run from Jupyter using InfluxDBclient Python library and hence the single and double { } around variables.

Thanx

1 Like

Hello @jonlula,
Out of curiosity are you developing an application on top of InfluxDB? Why do you use Jupyter to execute Flux queries? I’m interested in learning about your experience and goals.

You could do something like:

get_times = from(bucket: “machines”)
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == “SN000022”)
|> filter(fn: (r) => r.equipment == “T5001”)
|> filter(fn: (r) => r.data == “level”)
|> drop(columns: ["_start", “_stop”, “_field”, “_measurement”, “equipment”, “data” “system”, “unit”])
|> findColumn(
  fn: (key) => true,
  column: "_time") 

start_time = get_times[0]
stop_time = get_times[1]

from(bucket:“machines”)
|> range(start: start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == “SN000022”)
|> filter(fn: (r) => r.equipment == “P5001”)
|> filter(fn: (r) => r.data == “running”)
|> filter(fn: (r) => r._field == “value”)
|> difference()
|> filter(fn: (r) => r._value != 0)
|> map(fn: (r) => ({{r with t1: r._time}}))
|> map(fn: (r) => ({{r with t2: r._time}}))
|> timeShift(duration: -3s, columns: [“t1”])
|> timeShift(duration: 3s, columns: [“t2”])

using the findColumn() function or optionally findRecord() function:

Let me know if this helps

Hi @Anaisdg,

Thank you for the quick reply.
We experiment with InfluxDB as our primary storage for time-series data collected from our machines. We use Grafana for dashboards and Python, either as “stand alone” scripts or run from within Jupyter, as our main tool for doing analytics of machine performance.

Regarding the proposed solution:
I have a problem getting the findColum() function to work and keep getting the following error:

ApiException: (500)

Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({‘Content-Type’: ‘application/json’, ‘Request-Id’: ‘f4f9eed3-85d2-11ec-8fbf-eaace8e65cbe’, ‘X-Influxdb-Build’: ‘OSS’, ‘X-Influxdb-Error’: ‘type error 8:8-8:18: undefined identifier “findColumn”’, ‘X-Influxdb-Version’: ‘1.8.3’, ‘X-Request-Id’: ‘f4f9eed3-85d2-11ec-8fbf-eaace8e65cbe’, ‘Date’: ‘Fri, 04 Feb 2022 15:56:04 GMT’, ‘Content-Length’: ‘69’})
HTTP response body: b’{“error”:“type error 8:8-8:18: undefined identifier \“findColumn\””}\n’

I have also tried findRecord(), tableFind() and getColumn() with similar results.

Thank you