Flux query to compare data b/w 2 tables

MainTable
=======================================================
|timestamp          |IPAddress    |
=======================================================
|123456789          |192.168.5.2  |
|123456788          |192.168.5.3  |
|123456787          |192.168.5.4  |
|123456786          |192.168.5.5  |
=======================================================
LiveStatusTable
=======================================================
|timestamp          |IPAddress             |Value     |
========================================================
|123456789          |192.168.5.2           |1         |
|123456788          |192.168.5.3           |1         |
=======================================================
Now I need to compare these tables to check which IPAddress are missing in the 2nd table using Flux.

Hi @Aritra666B,
Could you use a join to achieve this: Joins in Flux | Additional resources | InfluxData Documentation

Hi @Jay_Clifford ,
With Join I will get the common values. In my case I need to check which IP address are not in LiveStatusTable but are in the MainTable to determine whihc systems are offline.
Is there a way for me to do this using Flux?

Hi @Aritra666B,
I am wondering if you did an outer join between the two tables. This would then leave null values within rows where there is not a match. You could then use map() to conditional check if the two columns match one another.

Could you export your a sample of your data and I will see if I can build the query for you

Hi @Jay_Clifford , sorry for late reply as I was sick & on leave.

I have imported them in csv but how to attach them here?

You could just use testing.diff(). You should make sure the schema of the two input tables are the same, so if you’re just trying to see what IP Addresses are missing, I’d just drop all columns except IPAddress. You’ll also need to make sure rows are sorted in the same order.

Here’s a working example using the data you provided above:

import "array"
import "testing"

MainTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2"},
            {timestamp: 123456788, IPAddress: "192.168.5.3"},
            {timestamp: 123456787, IPAddress: "192.168.5.4"},
            {timestamp: 123456786, IPAddress: "192.168.5.5"},
        ],
    )
        |> keep(columns: ["IPAddress"])
        |> sort(columns: ["IPAddress"])

LiveStatusTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2", Value: 1},
            {timestamp: 123456788, IPAddress: "192.168.5.3", Value: 1},
        ],
    )
        |> keep(columns: ["IPAddress"])
        |> sort(columns: ["IPAddress"])

testing.diff(got: LiveStatusTable, want: MainTable)

This returns:

_diff IPAddress
- 192.168.5.4
- 192.168.5.5
1 Like

@scott , @Jay_Clifford I’m trying to do the same thing, but am restricted to InfluxDB 1.8 + Flux 0.65.1 and therefore can’t use the “join” package. I’ve tried the solution using testing.diff and this works great as long as the “missing” values are contiguous in the “complete” table.
If LiveStatusTable looked like this:

LiveStatusTable =
    array.from(
        rows: [
            {timestamp: 123456789, IPAddress: "192.168.5.2", Value: 1},
            {timestamp: 123456788, IPAddress: "192.168.5.4", Value: 1},
        ],
    )

then the result would be incorrect.

I’ve tried to solve this using the getColumn and/or findColumn to create an array of expected values and then use filter to leave only those rows in LiveStatusTable that do not appear in MainTable:

expected_values = LiveStatusTable |> getColumn(column: "IPAddress")

LiveStatusTable 
  |> filter(fn: (r) => not contains(value: r["IPAddress"], set: expected_values)

But this always results in a `500 Internal Server Error: type error : missing object properties (schema)

@b_pennies The missing object properties (schema) error indicates you’re trying to use the schema package somewhere in your query. Is this your full query?

The schema package wasn’t introduced until Flux 0.88.0, but it’s essentially a port of the influxdata/influxdb/v1 package, so you should be able to use that instead with the version of Flux you’re using.

Here is the full query I’m running.

import "strings"
get_bucket = (datasource, kitchen) => if strings.containsStr(v: datasource, substr:"kitchen-data") then kitchen + "/autogen" else "kitchen/autogen"

bucket = get_bucket(datasource: "kitchen-data (flux)", kitchen: "kitchen3")

data = from(bucket: bucket)
  |> range(start: 2022-08-16T07:00:00Z, stop: 2022-08-17T06:59:59Z)
  |> filter(fn: (r) => r._field == "pizza_id")

expected = data 
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r.station == "dough-storage" and r._field == "pizza_id")
  |> distinct(column: "pizza_id")
  |> getColumn(column: "pizza_id")

I’m not sure where I’d be invoking the schema package

@b_pennies Ah, ok, I think I know why. In this particular case, you’re using getColumn() incorrectly. getColumn() actually requires tableFind() to extract a table from a stream of tables. getColumn() then extracts a column from the extracted table and returns an array of column values. So with your current query, getColumn is operating on the stream of tables passed to it from distinct(), not an extracted table.

I’d actually suggest using findColumn() instead of getColumn(). This is a newer implementation that both extracts the table and returns an array of column values. It’s available in InfluxDB 1.8:

// ...

expected = data 
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r.station == "dough-storage" and r._field == "pizza_id")
  |> distinct(column: "pizza_id")
  |> findColumn(column: "pizza_id", fn: (key) => true)

Another problem here is that the InfluxDB query API requires that a query returns a stream of tables. Right now, this query doesn’t. You have variables defined, but they query doesn’t call any variable to return it’s value. You could just call data at the end of the query to return the data stream of tables. You wouldn’t be able to call expected because it returns an array. The InfluxDB API doesn’t support returning raw arrays.

1 Like

@scott Ahh ok, that makes some sense now. I was able to get my query working thanks to your insights, as such:

import "strings"
$get_bucket

bucket = get_bucket(datasource: ${datasource:json}, kitchen: ${kitchen:json})

data = from(bucket: bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._field == "pizza_id")

completed = data 
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.station == "serve-conveyor" and r._field == "pizza_id")
|> distinct(column: "_value")
|> tableFind(fn: (key) => true)
|> getColumn(column: "_value")

unfinished = data
|> filter(fn: (r) => r.station == "dough-storage")
|> aggregateWindow(every: ${window_interval}, fn: distinct)
|> distinct()
|> filter(fn: (r) => not contains(value: r._value, set: completed))
|> yield()

where I expect the completed array to be a list of pizza ids that is a superset of those found in the raw unfinished data. The final output is a table (or stream of tables with 1 table?) that only includes those ids that do not appear in the unfinished data.

Interestingly I get a new error when trying to take your advice and using findColumn().

// ...
completed = data 
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.station == "serve-conveyor" and r._field == "pizza_id")
|> distinct(column: "_value")
|> findColumn(column: "_value", fn: (key) => true)

// ...

The error is simply undefined identifier "findColumn". From the documentation I’m guessing that findColumn was introduced in v0.68.0 but I only have access up to v0.65.1, which would explain this error.

Yep, you’re right. findColumn() isn’t supported in 1.8 so you’ll have to stick with tableFind() |> getColumn(). Sorry about that. I’m glad the query is working as it should. Let me know if there’s anything else I can help with.

1 Like

Thanks a ton for all the help @scott!!!

1 Like

@scott I recently started learning flux and found myself in the same problem. I want to compare data between two tables.
I want to find the number of active imie(which send data in the last 30 min) and the number of inactive imie that did not send any data in the last 40 min. all devices were active 24 hr ago.
a device can send data multiple times in 5 min also imie is a tag.
help me…

The first thing to understand is that InfluxDB can only provide data that does exist. I can’t tell you what data doesn’t exist (inactive imie). To figure this out, you have to query the list of all imie’s within the past 24 hours. To do this, you can just query all data from the last day, downsample the data to make it a little more manageable, ungroup it into a single table, and return all the unique values of imie:

imieAll = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> last()
    |> group()
    |> unique(column: "imie")

You an do the same thing to find all the active imies from the last 30 minutes and then count them:

imieActive = from(bucket: "example-bucket")
    |> range(start: -30m)
    |> last()
    |> group()
    |> unique(column: "imie")
    |> count()

To find the number of inactive imies, you need to subtract the number of those active in the last 40 minutes from the total number of imies:

imieTotal =
    (from(bucket: "example-bucket")
        |> range(start: -24h)
        |> last()
        |> group()
        |> unique(column: "imie")
        |> count()
        |> findColumn(fn: (key) => true, column: "_value"))[0]

imieActive =
    from(bucket: "example-bucket")
        |> range(start: -30m)
        |> last()
        |> group()
        |> unique(column: "imie")
        |> count()

imieActive
    |> map(fn: (r) => ({_value: imieTotal - r.imie}))

@scott don’t know why getting this error unique: schema collision detected: column "_value" is both of type float and string when running the above query. I’m sending imie number as string but still getting this error. example imie: 12AC1243224SD12

and sending this as a tag.

Ok, that error is being caused by ungrouping the tables (group()). This combines all rows into a single table, but all values in a column must be of the same type. This means you have fields of different types. I think the best way around this would be to query specific a field by adding filter() after both range calls:

imieTotal =
    (from(bucket: "example-bucket")
        |> range(start: -24h)
        |> filter(fn: (r) => r._field == "example-field")
        |> last()
        |> group()
        |> unique(column: "imie")
        |> count()
        |> findColumn(fn: (key) => true, column: "_value"))[0]

imieActive =
    from(bucket: "example-bucket")
        |> range(start: -30m)
        |> filter(fn: (r) => r._field == "example-field")
        |> last()
        |> group()
        |> unique(column: "imie")
        |> count()

imieActive
    |> map(fn: (r) => ({_value: imieTotal - r.imie}))

Thanks for all the help @scott!!!