How to use query result in range()

My query results in a list of timestamps. I would like to use these timestamps as the stop parameter in a range of another query.

Query:

charging_sessions = from(bucket: "test")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "charging_sessions")
  |> filter(fn: (r) => r["_field"] == "session_start_dts")
  |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
  |> keep(columns: ["_field", "_value"])
  |> yield(name: "session_start")

results in:

table
session_start
	_field
group
string
	_value
no group
long
0	session_start_dts	1646479320
0	session_start_dts	1646843541
0	session_start_dts	1648457833
0	session_start_dts	1651316596

How can I use these timestamps to construct the stop parameter of a range()?

Hello @cville

Hey, I know a way to do this but instead of the result that you show you will only get a single row. Then, you can pull the column that you want in another query:

charging_sessions = from(bucket: “test”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == “charging_sessions”)
|> filter(fn: (r) => r["_field"] == “session_start_dts”)
|> aggregateWindow(every: 1d, fn: last, createEmpty: false)
|> keep(columns: ["_field", “_value”])
|> last()
|> findRecord(fn: (key) => key._field == “fieldName”,idx: 0,)

from(bucket: “test”)
|> range(start: v.timeRangeStart, stop: charging_sessions ._value)

They mention this topic here also, if you want to check it out:

Hi @ Juan_Alonso_Pla,

Thanks for responding. I tried your code but it doesn’t run:

error calling function "range" @12:4-12:66: value is not a time, got null

This is with InfluxDB 2.2.0. Any idea?

I had to make a few edits after copy/paste: change illegal quotes into regular quotes and change “charging_sessions ._value” info “charging_sessions._value” (remove space before the dot).

Query:

charging_sessions = from(bucket: "test")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "charging_sessions")
  |> filter(fn: (r) => r["_field"] == "session_start_dts")
  |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
  |> keep(columns: ["_field", "_value"])
  |> last()
  |> findRecord(fn: (key) => key._field == "fieldName", idx: 0,)

from(bucket: "test")
|> range(start: v.timeRangeStart, stop: charging_sessions._value)

EDIT:

When I only run (excluding the “range” part):

charging_sessions = from(bucket: "test")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "charging_sessions")
  |> filter(fn: (r) => r["_field"] == "session_start_dts")
  |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
  |> keep(columns: ["_field", "_value"])
  |> last()
  |> findRecord(fn: (key) => key._field == "fieldName", idx: 0,)
  |> yield(name: "session_start")

I get a different error:

error @8:6-8:65: stream[A] is not Record (argument tables)

The following error:

error calling function "range" @12:4-12:66: value is not a time, got null

Arises from the fact that the variable is not a time variable, so you have to convert it into one. Try this instead:

charging_sessions = from(bucket: “test”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == “charging_sessions”)
|> filter(fn: (r) => r["_field"] == “session_start_dts”)
|> aggregateWindow(every: 1d, fn: last, createEmpty: false)
|> keep(columns: ["_field", “_value”])
|> last()
|> findRecord(fn: (key) => key._field == “fieldName”, idx: 0,)
|> yield(name: “session_start”)

from(bucket: “test”)
|> range(start: v.timeRangeStart, stop: stop: time(v: charging_sessions._value * 1000000000))

Then, you get this other error:

error @8:6-8:65: stream[A] is not Record (argument tables)

This is because with the function findRecord() returns a record not a stream of tables.

Let me know if this solves your question

@ Juan_Alonso_Pla It didn’t work. However to make it easier for you to test the code I wrote a few lines that simulate the output of the first part of my query, allowing you to run the code without having access to my bucket.

The code is now as follows:

import "array"

charging_sessions = array.from(rows: // create simulated query result
[
{_time: now(), _field: "session_start_dts", _value: 1646479320}, 
{_time: now(), _field: "session_start_dts", _value: 1646843541}, 
{_time: now(), _field: "session_start_dts", _value: 1648457833}, 
{_time: now(), _field: "session_start_dts", _value: 1651316596}, 
])

charging_sessions 
  |> last()
  |> map(fn: (r) => ( { r with _time: time(v: r._value * 1000000000) }))
  |> keep(columns: ["_field", "_time"])
  |> yield(name: "timestamps")
  |> range(start: -90d, stop: time(v: charging_sessions._time))

which yields the following error:

error @17:39-17:56: expected {A with _time:B} (record) but found stream[{_value:int, _time:time, _field:string}]

Hopefully that will allow you to propose fixes to the code.

I am not 100% sure what you are trying to do.

A stop parameter can be achieved with:

stopTime_var = time(v: (charging_sessions  |> last(column: "_value") |> findRecord(fn: (key) => true, idx: 0))._value * 1000000000) 

and get the result with

charging_sessions
    |> range(start: -90d  , stop: stopTime_var)
    |> yield(name: "timestamps")

In the documentation you can find how to extract scalar values here:

Here’s what I’m trying to do:

  1. run a query that will generate a list of timestamps. This is the query that I’ve simulated so anyone can run the sample code
  2. use the column with timestamps as stop parameter in another query that runs against another bucket. This will get me the data I’m looking for.

IHMO in order to achieve #2, I need to loop through all timestamps and use these as the stop parameter in a range. With help from @ Juan_Alonso_Pla I’ve successfully converted the integer unix timestamps into valid time instances but have not yet succeeded in getting these accepted by the range() function (see error above).

I hope this makes my objective clear enough for you to show me the right code. Any help will be much appreciated.

I think that you can not do that. First, the range() function only accepts one time value at a time and not a list. Also there is no loop in flux, except map(), which could maybe be possible with your data.

And that error happens because your “charging_sessions” is a stream and not a table

I am also very interested in your problem and hope you find a better working solution. :smiley:

EDIT: These 2 return a single record (like a row in csv) :
|> last()
|> findRecord(fn: (key) => key._field == “fieldName”, idx: 0,)
This 1 returns a table (like a sheet in csv):
|> tableFind(fn: (key) => true)

and findRecord() and tableFind() return only the first “thing” they find with the given filter. So tableFind (key) => true returns first table!

No solution, but a quick and bad work-around

import "array"

testbucket = array.from(rows: // "test bucket"
[
{_time: now(), _field: "session_start_dts", _value: 1646479320}, 
{_time: now(), _field: "session_start_dts", _value: 1646843541}, 
{_time: now(), _field: "session_start_dts", _value: 1648457833}, 
{_time: now(), _field: "session_start_dts", _value: 1651316596}, 
])

charging_sessions = array.from(rows: // create simulated query result
[
{_time: now(), _field: "session_start_dts", _value: 1646479320}, 
{_time: now(), _field: "session_start_dts", _value: 1646843541}, 
{_time: now(), _field: "session_start_dts", _value: 1648457833}, 
{_time: now(), _field: "session_start_dts", _value: 1651316596}, 
])

// you only get the !last! entry from the record with last()
stopTime_var = time(v: (charging_sessions  |> last(column: "_value") |> findRecord(fn: (key) => true, idx: 0))._value * 1001000000) 

// get all data that is older then extracted last() time of from above
testbucket
    |> range(start: -90d  , stop: stopTime_var)
    |> yield(name: "timestamps")


// dirty version of getting your simulated example timestamps out
stopTime_var0 = time(v: (charging_sessions |> tableFind(fn: (key) => true) |> getColumn(column: "_value"))[0] * 1001000000)
stopTime_var1 = time(v: (charging_sessions |> tableFind(fn: (key) => true) |> getColumn(column: "_value"))[1] * 1001000000)
stopTime_var2 = time(v: (charging_sessions |> tableFind(fn: (key) => true) |> getColumn(column: "_value"))[2] * 1001000000)
stopTime_var3 = time(v: (charging_sessions |> tableFind(fn: (key) => true) |> getColumn(column: "_value"))[3] * 1001000000)


testbucket
    |> range(start: -90d  , stop: stopTime_var0)
    |> yield(name: "timestamps0")

testbucket
    |> range(start: -90d  , stop: stopTime_var1)
    |> yield(name: "timestamps1")

testbucket
    |> range(start: -90d  , stop: stopTime_var2)
    |> yield(name: "timestamps2")

testbucket
    |> range(start: -90d  , stop: stopTime_var3)
    |> yield(name: "timestamps3")    

Thanks @Koma. I realize that range() only accepts scalars but I was hoping that code could be written that loops outside the range() function and invokes range() one list item at a time.

If no solution presents itself, I’ll write a Python program that does the work an store the result in InfluxDB. Not as elegant but since my requirements only create new records every few weeks, there’s only a very small amount of additional storage needed.

I’m still somewhat surprised that this is so difficult in Flux and would like to hear an opinion from InfluxData. Anyone?