Suggest approach to Calculate KPIs

I am getting data from aws kinesis and I am pushing to influxdb. Data is separated by Venue tag, denoting Venue as a unique location. In influxdb I am calculating kpi using task which is running every hour, so what happens is there is delay in the oms which can extend from 15 min to 2 hours. So, for example I am receiving 1 tar file from each venue. Data for 11 am should come at 12 pm, however due to delay, it might go until 2 pm.

I need to run tasks in such a way that, if for any venue I am got the data, I need to calculate the kpi. How Can I do that. I thought of running tasks at 15 mins or 30 mins instead of 1 hour, but in that case the already computed Venue will again be computed.

Hello @Anaisdg I need your help for following query:
Say table1 = return all values of a particular tag in last 7 days
and table2 = return all values of particular tag in last 1 hour

I need all those values which are present in table1 but not in table2. How to do that?

table1 = from(bucket: "bucket")
  |> range(start: -7d)
  |> filter(fn: (r) => r["_measurement"] == "measurement_name")
  |> filter(fn: (r) => r["_field"] == "field_name")
  |> keep(columns: ["node_cellId"])
  |> distinct(column: "node_cellId")
  |> drop(columns:["node_cellId"])
  |> group()

table2 = from(bucket: "bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "measurement_name")
  |> filter(fn: (r) => r["_field"] == "field_name")
  |> keep(columns: ["node_cellId"])
  |> distinct(column: "node_cellId")
  |> drop(columns:["node_cellId"])
  |> group()

Hi @Pratik_Das_Baghel

Maybe the join() function to combine the two tables and then filter the results based on whether or not the value is present in both tables, like this?

table1 = from(bucket: "bucket")
  |> range(start: -7d)
  |> filter(fn: (r) => r["_measurement"] == "measurement_name")
  |> filter(fn: (r) => r["_field"] == "field_name")
  |> keep(columns: ["node_cellId"])
  |> distinct(column: "node_cellId")
  |> group()

table2 = from(bucket: "bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "measurement_name")
  |> filter(fn: (r) => r["_field"] == "field_name")
  |> keep(columns: ["node_cellId"])
  |> distinct(column: "node_cellId")
  |> group()

join(
  tables: {t1: table1, t2: table2},
  on: ["node_cellId"]
)
  |> map(fn: (r) => ({
    _time: r._time,
    _field: "difference",
    _value: if exists r._value_t2 then 0 else 1
  }))
  |> filter(fn: (r) => r._value == 1)
  |> drop(columns: ["_value", "_time", "_field"])