Performance when using window function

I am using influxdb 2.7.6, and created a measurement called cgm_glucose_history. I added a tag called device_sn, a field called glucose, and a field called device_time to record the exact time when blood glucose was generated. In this measurement, each device_sn generates a glucose value every minute.

I use the java client to save data

List<Point> pointList = new ArrayList<>();
        for (CgmGlucose cgmGlucose : list) {
            // Time alignment to minutes, 00:00,00:01,00:2,00:3...
            long time = cgmGlucose.getDeviceTime() - cgmGlucose.getDeviceTime() % 1000L;
            Point point = Point.measurement(MEASUREMENT_CGM_GLUCOSE_HISTORY)
                    .time(time, WritePrecision.MS)
                    .addTag("patient_code", cgmGlucose.getPatientCode())
                    .addTag("device_sn", cgmGlucose.getDeviceSn())
                    .addField("glucose", cgmGlucose.getGlucose())
                    .addField("device_time", cgmGlucose.getDeviceTime());
            pointList.add(point);
            if (pointList.size() >= 1000) {
                writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
                pointList.clear();
            }
        }
        if (!pointList.isEmpty()) {
            writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
            pointList.clear();
        }
        writeApi.close();

I now have a requirement to query the time when a specific device_sn first had a sustained hyperglycemia event (if any). Sustained hyperglycemia is defined as a glucose value greater than 13.9 for more than two hours. I used the window and reduce methods, and the code is as follows:

import "interpolate"
from(bucket: "cdm_dm")
  |> range(start: 1718679994)
  |> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
  |> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
  |> filter(fn: (r) => r["_field"] == "glucose" or r["_field"] == "device_time")
  |> map(fn: (r) => ({r with _value: float(v: r._value)}))
  |> interpolate.linear(every: 1m)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> window(every: 1m, period: 122m)
  // The core logic is to count the points where the glucose value is greater than 13.9 within two hours.
  |> reduce(fn:(r, accumulator) => ({
  count: if r.glucose > 13.9 then accumulator.count+1 else 0,
  event_start_time:if r.glucose > 13.9  then r.device_time else 0.0,
  glucose:if (r.glucose > 13.9 and accumulator.count==121) then r.glucose else  if (r.glucose > 13.9 and accumulator.count < 121)
  then accumulator.glucose else 0.0 
  }),identity:{count:0,event_start_time:0.0,glucose:0.0})
  |> duplicate(column: "_start", as: "_time")
  |> window(every: inf)
  |> filter(fn: (r) => r["count"] == 122)
  |> limit(n:1)

However, in the actual process, I found that the execution time is particularly long. With very little test data, it takes more than 30 seconds to execute to get the result. Is there something wrong with my code logic? Is there a better way for influx to achieve this requirement?

Can you try these things and see if that helps performance of the query:

  1. Try to reduce the data size of the window by filtering beforehand where glucose is greater than 13.9. This significantly reduces the data size for the subsequent window
    |> filter(fn: (r) => r.glucose > 13.9)

  2. Use a window function with a smaller period to identify consecutive occurrences of the event per minute for a total duration of two hours. window(every: 1m)

You might benefit from using the following function:

Instead of reduce and then filtering for whether the duration is greater than 2 hours.

1 Like

Thank you very much for your reply. The problem is indeed in the 1m of the window, but if I fliter first, I can’t tell whether it is continuous.

It’s really great, it dropped from 40s to 0.1 seconds

from(bucket: "cdm_dm")
  |> range(start: 1718679994)
  |> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
  |> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
  |> filter(fn: (r) => r["_field"] == "glucose")
  |> stateDuration(fn: (r) => r._value > 13.9)
  |> filter(fn:(r) => r["stateDuration"] > 120*60)
  |> limit(n:1)
1 Like