For-each loop for tables (or other possibilities to randomly join two tables)

Hi all,

I know this is very specific, but I hope maybe someone can still help me out.

I am basically trying to join two tables. The rows of both tables are associated with same measurements, but have slightly different timestamps (similar but not perfectly aligned).
For this I miss some kind of for-each loop or random functions that could apply to each element of one table and could be called from another table.

The syntax would be somehow similar to:

I tried to implement getElement as a custom function, but this one already returns
an error I do no understand (“Canont mix implicit and explicit preperties”)

getElement = (tables=<-, StartTime, StopTime, val) =>
tables
|> range(start: MyStartTime, stop: MyEndTime)
|> filter(fn: ® => r.a == val )
|> tableFind(fn: (key) => key._measurement == “m1”)
|> getRecord(idx: 0)

Then I would like to call this function for each element of a new stream, something like:

myTableStream1 = from(bucket: “test/autogen”)
|> range(start: MyStartTime, stop: MyEndTime)
|> filter(fn: ® => r._measurement == “m1” )

myTableStream2 = from(bucket: “test/autogen”)
|> range(start: MyStartTime, stop: MyEndTime)
|> filter(fn: ® => r._measurement == “m2” )
/// This is the part where I should join the m2 stream and m1 stream - I supposed it would be something like:
|> filter(fn: getElement(myTableStream1, r._time -1s, r._time+1s, r.b)

Unfortunatelly I do not know how to save the result of the operation in a temporary variable and pass it to the next filter in order for the next function that either adds the columns of the measurement from m1, or drop the whole row.

Also, I have not found a possibility to simply join two rows (in case they do not have the same timestamp). Assuming I have two rows r1 and r2 and want to join them into one row. As far as I could see the map() function only works if you know the names of the columns. Any other possibilities to do this?

Any hints would be greatly appreciated.

All the best

Hello @Aga-Schwarz,
Thanks for your question. Can you please help me out by providing an example input and output of what you want your flux script to do? This way we can use the same data too :wink:

I second @Anaisdg - example input and output would be really helpful.

Also, to explain the “Cannot mix implicit and explicit properties” error you’re getting, it has to do with how you’re calling the getElement function. Flux does not support positional parameters (yet), meaning all parameters must be named. So in your function definition, you name each of the parameters:

getElement = (tables=<-, StartTime, StopTime, val) =>
  tables
    |> range(start: MyStartTime, stop: MyEndTime)
    |> filter(fn: (r) => r.a == val )
    |> tableFind(fn: (key) => key._measurement == “m1”)
    |> getRecord(idx: 0)

tables is the only parameter with a default value, so the others (StartTime, StopTime, and val) must be set when calling the function. Flux does allow a shorthand method where, if a parameter key and value are the same, you don’t need to provide a key-value pair; just the key. This would be considered an “implicit” property.

In your call to getElement(), you don’t specify any parameter keys (which I assume is an attempt at positional parameters), so Flux is trying to parse them as implicit key-value pairs.

In regards to trying to solve the larger problem, I think I understand what you’re trying to do, but seeing your expected output will definitely help.

Hi @Anaisdg, hi @scott

@scott: Thank you for your positional Parameters hint, obviously I forgot to name the parameters…

What I am trying to achieve:
flux_1

Assume I have two timestamps T1 and T2 and two slightly different timestamps T1’ and T2’ which differ with few nanoseconds (the difference can be ignored, and varies between measurements). I want to join both tables based on matching timestamps and matching addtional parameter z.

Also, the names of the colums in measurements are not known (besides column z) - so I would need a function which joins rows without knowing the names of the colums. I considered adjusting the Timestamps T1’, T2’ in m2, so they match exactly the Timestamps in m1 and then using a regular join. For this I would also need a function, similar to getElement, e.g. a getTimestamp, with same Input as getElement which would return as Output the matching timestamp. But I do not know how to return a value from a function and use it for further computation…

Thanks a lot for your support!

@Aga-Schwarz I would go the timestamp-normalization route and use a normal join. I think this will give you exactly what you’re looking for. The best way to normalize timestamps is to use the date.truncate() function which truncates timestamps to specified unit of time (milliseconds, seconds, minutes, etc.). It essentially reduces the precision of your timestamps, allowing you to remove the subtle nanosecond differences.

In the example below, I import the date package, create a normalizeTime() function that uses date.truncate() to normalize timestamps in the _time column, create the two different data streams, then join them together on _time and z.

import "date"

normalizeTime = (tables=<-, unit=1ms) =>
  tables
    |> map(fn: (r) => ({ r with _time: date.truncate(t: r._time, unit:unit)}))

m1 = from(bucket: "test/autogen")
  |> range(start: MyStartTime, stop: MyEndTime)
  |> filter(fn: (r) => r._measurement == "m1" )
  |> normalizeTime()

m2 = from(bucket: "test/autogen")
  |> range(start: MyStartTime, stop: MyEndTime)
  |> filter(fn: (r) => r._measurement == "m2" )
  |> normalizeTime()

join(tables: {m1:m1, m2:m2}, on: ["_time", "z"])
1 Like

@scott
Thanks a lot for your great input, this should solve most of the Problems!

Happy to help! Let me know if you run into anything else.