How to combine returned tables in Flux

Dear All,

I have the following query:

from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
  |> filter(fn: (r) => r["_field"] == "gps_accuracy" or r["_field"] == "latitude" or r["_field"] == "longitude" or r["_field"] == "velocity")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

As a result I am getting 4 tables, instead of one table with with the relevant rows (as in InfluxDB 1.x). How do I use this?

I am sure this is a basic question, but I am completely new to flux and I haven’t been able to get this info from the docs.

Thanks in advance for any pointers!

Ok, I managed to do what I wanted with this query:

tableGPSAccuracy = from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
            |> filter(fn: (r) => r["_field"] == "gps_accuracy")
  |> drop(columns: ["_start", "_stop", "domain", "_measurement", "entity_id", "source", "_field"])


tableLatitude = from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
      |> filter(fn: (r) => r["_field"] == "latitude")
  |> drop(columns: ["_start", "_stop", "domain", "_measurement", "entity_id", "source", "_field"])
          

tableLongitude = from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
            |> filter(fn: (r) => r["_field"] == "longitude")
    |> drop(columns: ["_start", "_stop", "domain", "_measurement", "entity_id", "source", "_field"])
                              
tableVelocity = from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
                    |> filter(fn: (r) => r["_field"] == "velocity")
  |> drop(columns: ["_start", "_stop", "domain", "_measurement", "entity_id", "source", "_field"])


tableTmp1 = join(tables: {tLat: tableLatitude, tLon: tableLongitude}, on: ["_time"], method: "inner")
tableTmp2 = join(tables: {tAcc: tableGPSAccuracy, tVel: tableVelocity}, on: ["_time"], method: "inner")

ResultTable = join(tables: {tTmp1: tableTmp1, tTmp2: tableTmp2}, on: ["_time"], method: "inner")

|> duplicate(column: "_value_tLat", as: "lat")
|> duplicate(column: "_value_tLon", as: "lon")
|> drop(columns: ["_value_tLat", "_value_tLon"])

ResultTable |> yield()

This does exactly what I wanted. Now I would just like someone more experienced to confirm that this is the way to do it. To me it looks a bit convoluted…

I believe the group() function will merge all of your tables into one.

Try this:

from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
  |> filter(fn: (r) => r["_field"] == "gps_accuracy" or r["_field"] == "latitude" or r["_field"] == "longitude" or r["_field"] == "velocity")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> group()
  |> yield(name: "mean")

@mhall119 I’m afraid it doesn’t do it right: It does combine the tables, but the resulting table will have all the values in one column _value rather than having separate columns.
Well, by right I mean it doesn’t do what I want, I am not implying that it does not do what it is supposed to… :slight_smile:

Ah, that’s because InfluxData processes data in columns by default, rather than rows. If you want it to return all of your column values as a row of fields, call fieldsAsCols() on it.

@mhall119 thank you for your input.

Reading the documentation on fieldsAsCols() I am unable to get it working - probably my (very) limited experience with Flux is to blame…

However, over the week-end I came up with a much shorter version than my original one. It is very similar to what you just suggested, but without the fieldsAsCols() function:

from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "device_tracker.mate10")
  |> filter(fn: (r) => r["_field"] == "gps_accuracy" or r["_field"] == "latitude" or r["_field"] == "longitude" or r["_field"] == "velocity")

|> pivot(columnKey: ["_field"], rowKey: ["_time"], valueColumn: "_value")

|> duplicate(column: "latitude", as: "lat")
|> duplicate(column: "longitude", as: "lon")
|> duplicate(column: "_time", as: "tooltip")
|> duplicate(column: "velocity", as: "popup")

|> map(fn: (r) => ({ r with popup: "Speed: " + string(v:r.popup) + " km/h" }))

|> keep(columns: ["_time", "tooltip", "lat", "lon", "popup"])

This seems to be working perfectly, and is concise.

@viktak Looks like you got it figured out. I’m not sure why schema.fieldsAsCols() didn’t work as it’s just a special use case for pivot(). To add to the conciseness and possibly performance of your query, try:

from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "device_tracker.mate10")
  |> filter(fn: (r) => r._field == "gps_accuracy" or r._field == "latitude" or r._field == "longitude" or r._field == "velocity")
  |> pivot(columnKey: ["_field"], rowKey: ["_time"], valueColumn: "_value")
  |> rename(columns: {latitude: "lat", longitude: "lon", velocity: "popup"})
  |> duplicate(column: "_time", as: "tooltip")
  |> map(fn: (r) => ({ r with popup: "Speed: ${string(v:r.popup)} km/h" }))
  |> keep(columns: ["_time", "tooltip", "lat", "lon", "popup"])

@scott The snippet you suggested fails at ${string(v:r.popup)}. If I remove it, the query works. Can you point me to the documentation for this construct?

Thank you for drawing my attention to the rename function - somehow I did not notice it (well, everything is new to me in Flux…

Could you show me an exact example with the fieldsAsCols() function? I’m sure it works, I just didn’t use it properly.