Flux query to compare data b/w 2 tables

MainTable
=======================================================
|timestamp          |IPAddress    |
=======================================================
|123456789          |192.168.5.2  |
|123456788          |192.168.5.3  |
|123456787          |192.168.5.4  |
|123456786          |192.168.5.5  |
=======================================================
LiveStatusTable
=======================================================
|timestamp          |IPAddress             |Value     |
========================================================
|123456789          |192.168.5.2           |1         |
|123456788          |192.168.5.3           |1         |
=======================================================
Now I need to compare these tables to check which IPAddress are missing in the 2nd table using Flux.

Hi @Aritra666B,
Could you use a join to achieve this: Joins in Flux | Additional resources | InfluxData Documentation

Hi @Jay_Clifford ,
With Join I will get the common values. In my case I need to check which IP address are not in LiveStatusTable but are in the MainTable to determine whihc systems are offline.
Is there a way for me to do this using Flux?

Hi @Aritra666B,
I am wondering if you did an outer join between the two tables. This would then leave null values within rows where there is not a match. You could then use map() to conditional check if the two columns match one another.

Could you export your a sample of your data and I will see if I can build the query for you

Hi @Jay_Clifford , sorry for late reply as I was sick & on leave.

I have imported them in csv but how to attach them here?

You could just use testing.diff(). You should make sure the schema of the two input tables are the same, so if you’re just trying to see what IP Addresses are missing, I’d just drop all columns except IPAddress. You’ll also need to make sure rows are sorted in the same order.

Here’s a working example using the data you provided above:

import "array"
import "testing"

MainTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2"},
            {timestamp: 123456788, IPAddress: "192.168.5.3"},
            {timestamp: 123456787, IPAddress: "192.168.5.4"},
            {timestamp: 123456786, IPAddress: "192.168.5.5"},
        ],
    )
        |> keep(columns: ["IPAddress"])
        |> sort(columns: ["IPAddress"])

LiveStatusTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2", Value: 1},
            {timestamp: 123456788, IPAddress: "192.168.5.3", Value: 1},
        ],
    )
        |> keep(columns: ["IPAddress"])
        |> sort(columns: ["IPAddress"])

testing.diff(got: LiveStatusTable, want: MainTable)

This returns:

_diff IPAddress
- 192.168.5.4
- 192.168.5.5
1 Like

@scott , @Jay_Clifford I’m trying to do the same thing, but am restricted to InfluxDB 1.8 + Flux 0.65.1 and therefore can’t use the “join” package. I’ve tried the solution using testing.diff and this works great as long as the “missing” values are contiguous in the “complete” table.
If LiveStatusTable looked like this:

LiveStatusTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2", Value: 1},
            {timestamp: 123456788, IPAddress: "192.168.5.4", Value: 1},
        ],
    )

then the result would be incorrect.

I’ve tried to solve this using the getColumn and/or findColumn to create an array of expected values and then use filter to leave only those rows in LiveStatusTable that do not appear in MainTable:

expected_values = LiveStatusTable |> getColumn(column: "IPAddress")

LiveStatusTable 
  |> filter(fn: (r) => not contains(value: r["IPAddress"], set: expected_values)

But this always results in a `500 Internal Server Error: type error : missing object properties (schema)

@b_pennies The missing object properties (schema) error indicates you’re trying to use the schema package somewhere in your query. Is this your full query?

The schema package wasn’t introduced until Flux 0.88.0, but it’s essentially a port of the influxdata/influxdb/v1 package, so you should be able to use that instead with the version of Flux you’re using.

Here is the full query I’m running.

import "strings"
get_bucket = (datasource, kitchen) => if strings.containsStr(v: datasource, substr:"kitchen-data") then kitchen + "/autogen" else "kitchen/autogen"

bucket = get_bucket(datasource: "kitchen-data (flux)", kitchen: "kitchen3")

data = from(bucket: bucket)
  |> range(start: 2022-08-16T07:00:00Z, stop: 2022-08-17T06:59:59Z)
  |> filter(fn: (r) => r._field == "pizza_id")

expected = data 
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r.station == "dough-storage" and r._field == "pizza_id")
  |> distinct(column: "pizza_id")
  |> getColumn(column: "pizza_id")

I’m not sure where I’d be invoking the schema package

@b_pennies Ah, ok, I think I know why. In this particular case, you’re using getColumn() incorrectly. getColumn() actually requires tableFind() to extract a table from a stream of tables. getColumn() then extracts a column from the extracted table and returns an array of column values. So with your current query, getColumn is operating on the stream of tables passed to it from distinct(), not an extracted table.

I’d actually suggest using findColumn() instead of getColumn(). This is a newer implementation that both extracts the table and returns an array of column values. It’s available in InfluxDB 1.8:

// ...

expected = data 
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r.station == "dough-storage" and r._field == "pizza_id")
  |> distinct(column: "pizza_id")
  |> findColumn(column: "pizza_id", fn: (key) => true)

Another problem here is that the InfluxDB query API requires that a query returns a stream of tables. Right now, this query doesn’t. You have variables defined, but they query doesn’t call any variable to return it’s value. You could just call data at the end of the query to return the data stream of tables. You wouldn’t be able to call expected because it returns an array. The InfluxDB API doesn’t support returning raw arrays.

1 Like

@scott Ahh ok, that makes some sense now. I was able to get my query working thanks to your insights, as such:

import "strings"
$get_bucket

bucket = get_bucket(datasource: ${datasource:json}, kitchen: ${kitchen:json})

data = from(bucket: bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._field == "pizza_id")

completed = data 
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.station == "serve-conveyor" and r._field == "pizza_id")
|> distinct(column: "_value")
|> tableFind(fn: (key) => true)
|> getColumn(column: "_value")

unfinished = data
|> filter(fn: (r) => r.station == "dough-storage")
|> aggregateWindow(every: ${window_interval}, fn: distinct)
|> distinct()
|> filter(fn: (r) => not contains(value: r._value, set: completed))
|> yield()

where I expect the completed array to be a list of pizza ids that is a superset of those found in the raw unfinished data. The final output is a table (or stream of tables with 1 table?) that only includes those ids that do not appear in the unfinished data.

Interestingly I get a new error when trying to take your advice and using findColumn().

// ...
completed = data 
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.station == "serve-conveyor" and r._field == "pizza_id")
|> distinct(column: "_value")
|> findColumn(column: "_value", fn: (key) => true)

// ...

The error is simply undefined identifier "findColumn". From the documentation I’m guessing that findColumn was introduced in v0.68.0 but I only have access up to v0.65.1, which would explain this error.

Yep, you’re right. findColumn() isn’t supported in 1.8 so you’ll have to stick with tableFind() |> getColumn(). Sorry about that. I’m glad the query is working as it should. Let me know if there’s anything else I can help with.

1 Like

Thanks a ton for all the help @scott!!!

1 Like