Hello,
I have a number of devices sending telemetry to Influxdb and I’m trying to record, based on the unique identifier of the device (device_id) whether it is the first time the device has been seen, and if yes, write a record to another measurement. The query will run as a task every 5 minutes say. I think I’ve got it working, but it seems overly complex - have I missed something, is there an easier way to do this?
Here is the test data I’m working with in my bucket “test”:
device_status,status=online device_id="a1" 1635811200
device_status,status=online device_id="a2" 1635811300
device_status,status=offline device_id="a1" 1635811400
device_status_history,event=created device_id="a1" 1604275200
device_status_history,event=created device_id="a3" 1604275300
where the times like 1635811xxx are around midnight 2nd November 2021 and 1604275xxx in the “device_status_history” is a year ago, simulating the time the device was first seen. Device_id is a field to avoid cardinality issues (in real life there will be 100k+ unique values).
When I run the query, I want to write a record for device “a2” only; it has come online and has not been seen before (no record in device_status_history). Device “a1” has already been seen (record exists in device_status_history) and device “a3”, while it hasn’t come online in the “range”, has also been seen - including this for completeness since it confused some earlier approaches based around counts of device_id.
Conceptually, I want to find members of A (device_status) not in B (device_status_history). Here is something that I think works:
status = from(bucket: "test")
// in reality, this will be -5m but using -24 so the test data works at the time of writing
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "device_status")
|> filter(fn: (r) => r["_field"] == "device_id")
// drop any tags we have in real life data
|> keep(columns:["_time","_field","_value","status"])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["status"] == "online")
|> distinct(column:"device_id")
|> map(fn: (r) => ({_time: now(), device_id: r._value, _field:"online",_value:true}))
|> group(columns:["device_id"])
created = from(bucket: "test")
// look back since records began
|> range(start: -5y)
|> filter(fn: (r) => r["_measurement"] == "device_status_history")
|> filter(fn: (r) => r["event"] == "created")
|> filter(fn: (r) => r["_field"] == "device_id")
|> keep(columns:["_time","_field","_value", "event"])
// get all the unique values we have seen
|> distinct(column:"_value")
// make the output look the same as "status" query and add in _time again (removed by distinct)
|> map(fn: (r) => ({_time: now(), device_id: r._value, _field:"created", _value:true}))
|> group(columns:["device_id"])
union(tables:[status, created])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group()
// use fill here, testing for null or != doesn't work
|> fill(column: "created", value: false)
// drop the devices that have already been created
|> filter(fn: (r) => r["created"] == false )
|> set(key: "_measurement", value: "device_status_history")
|> set(key: "event", value: "created")
// |> to(
// bucket: "test",
// tagColumns: ["event"],
// fieldFn: (r) => ({"device_id": r.device_id, "description": "device created"}),
// )
If I comment out the “to” section, the first time this is run device a2 gets created at time “now” (which is tolerably close to when it comes online if the task runs every 5m) and the second time, I get “no results”.
But again, this feels overworked - is there a simpler way?
Thanks,
Tom