Compare lists and find members of A not in B

Hello,

I have a number of devices sending telemetry to Influxdb and I’m trying to record, based on the unique identifier of the device (device_id) whether it is the first time the device has been seen, and if yes, write a record to another measurement. The query will run as a task every 5 minutes say. I think I’ve got it working, but it seems overly complex - have I missed something, is there an easier way to do this?

Here is the test data I’m working with in my bucket “test”:

device_status,status=online device_id="a1" 1635811200
device_status,status=online device_id="a2" 1635811300
device_status,status=offline device_id="a1" 1635811400
device_status_history,event=created device_id="a1" 1604275200
device_status_history,event=created device_id="a3" 1604275300

where the times like 1635811xxx are around midnight 2nd November 2021 and 1604275xxx in the “device_status_history” is a year ago, simulating the time the device was first seen. Device_id is a field to avoid cardinality issues (in real life there will be 100k+ unique values).

When I run the query, I want to write a record for device “a2” only; it has come online and has not been seen before (no record in device_status_history). Device “a1” has already been seen (record exists in device_status_history) and device “a3”, while it hasn’t come online in the “range”, has also been seen - including this for completeness since it confused some earlier approaches based around counts of device_id.

Conceptually, I want to find members of A (device_status) not in B (device_status_history). Here is something that I think works:

status = from(bucket: "test")
// in reality, this will be -5m but using -24 so the test data works at the time of writing
  |> range(start: -24h)
  |> filter(fn: (r) => r["_measurement"] == "device_status")
  |> filter(fn: (r) => r["_field"] == "device_id")
// drop any tags we have in real life data
  |> keep(columns:["_time","_field","_value","status"])
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r["status"] == "online")  
  |> distinct(column:"device_id")
  |> map(fn: (r) => ({_time: now(), device_id: r._value, _field:"online",_value:true}))
  |> group(columns:["device_id"])

created = from(bucket: "test")
// look back since records began
  |> range(start: -5y)
  |> filter(fn: (r) => r["_measurement"] == "device_status_history")
  |> filter(fn: (r) => r["event"] == "created")
  |> filter(fn: (r) => r["_field"] == "device_id")
  |> keep(columns:["_time","_field","_value", "event"])
// get all the unique values we have seen
  |> distinct(column:"_value")
// make the output look the same as "status" query and add in _time again (removed by distinct)
  |> map(fn: (r) => ({_time: now(), device_id: r._value, _field:"created", _value:true}))
  |> group(columns:["device_id"])

union(tables:[status, created])
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> group()
// use fill here, testing for null or != doesn't work
  |> fill(column: "created", value: false)
// drop the devices that have already been created
  |> filter(fn: (r) => r["created"] == false )
  |> set(key: "_measurement", value: "device_status_history")
  |> set(key: "event", value: "created")
//  |> to(
//    bucket: "test",
//    tagColumns: ["event"],
//    fieldFn: (r) => ({"device_id": r.device_id, "description": "device created"}),
//  )

If I comment out the “to” section, the first time this is run device a2 gets created at time “now” (which is tolerably close to when it comes online if the task runs every 5m) and the second time, I get “no results”.
But again, this feels overworked - is there a simpler way?

Thanks,
Tom

Hello @thopewell,
So I just want to verify that I understand? It sounds like you want to perform a left or right join?
Sorry for the delay sometimes I miss questions always feel free to tag me.
Your query is really creative.

Hi Anaisdg,

I think I’m really asking if there is some way to conceptually do this in flux:

A = select device_id from device_status where time > -5m
B = select A from device_status_history
take some action on device_id in A not in B

which if possible, seems more efficient than what I’m doing now:

A = select device_id from device_status where time > -5m
B = select all device_id from device_status_history
take some action on device_id in A not in B

because in real life, B might have 100k+ entries in it where as A will be small (say 10).

To your point, instead of doing union I did originally try join, but I don’t think it works, maybe because _time is treated as the primary key where as I want to ignore _time and just join on device_id?

If I put this in place of the union:

experimental.join(
  left: status,
  right: created,
  fn: (left, right) => ({
    left with
    lv: left._value,
    rv: right._value,
  })
)

I get this:

I was hoping to get a row for each device_id in status (left) with rv as either true or null/empty.

In any case, what I have appears to work!

Thanks,
Tom

Hello @thopewell,
To do:
B = select A from device_status_history
You could use regex:

  |> filter(fn: (r) => r["_value"] =~ /a[0-n]$/

where n = number of different a
When I see:

take some action on device_id in A not in B

I think of left joins as well.

Yes experimental.join() does function joins two streams of tables on the group key and _time column.
However you specify the columns with join(). You could give that a try. And then add your function with map()