Task fails with unexpected error from queryd: queue length exceeded

Periodically obtain error “unexpected error from queryd: queue length exceeded”
on downsampling tasks (separate tasks for 1m, 5m, 15m, 1h, 3h, 1w, the bigger period, more errors happen) but next run can be successful.
Please suggest in which side I must to dig first?

Run InfluxDB OSS v2.7 on dedicated server with 8CPU (single thread), 16GB RAM.

config.toml

session-renew-disabled=true
session-length=1440
query-concurrency=8
query-queue-size=15

Task:

option task = {name: "Calculate Quantile with count  3h", every: 3h}

aggregationInterval = task.every
aggregationIntervalStr = string(v: task.every)

fromBucket = "MEG"
toBucket = "MEG_1y"

aggregateMethod = (q) =>
    if q == 0.5 then
        "p50"
    else if q == 0.8 then
        "p80"
    else if q == 0.9 then
        "p90"
    else if q == 0.95 then
        "p95"
    else if q == 0.99 then
        "p99"
    else
        "p90"

filteredData = () =>
    from(bucket: fromBucket)
        |> range(start: -aggregationInterval)
        |> filter(fn: (r) => r["_measurement"] == "user_timing")
        |> filter(fn: (r) => not exists r.aggregate_method)
        |> map(fn: (r) => ({r with _value: float(v: r._value)}))

calculateQuantile = (q) => {
    quantileData =
        filteredData()
            |> aggregateWindow(
                every: aggregationInterval,
                fn: (tables=<-, column) => tables |> quantile(q: q),
                createEmpty: false,
            )
            |> set(key: "_field", value: "quantile")

    countData =
        filteredData()
            |> aggregateWindow(every: aggregationInterval, fn: count, createEmpty: false)
            |> set(key: "_field", value: "count")

    quantileWithCount =
        union(tables: [quantileData, countData])
            |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
            |> map(
                fn: (r) =>
                    ({r with aggregate_method: aggregateMethod(q),
                        aggregate_interval: aggregationIntervalStr,
                    }),
            )
            |> to(bucket: toBucket, fieldFn: (r) => ({"quantile": r.quantile, "count": r.count}))

    return true
}

calculateQuantile(q: 0.5)
calculateQuantile(q: 0.8)
calculateQuantile(q: 0.9)
calculateQuantile(q: 0.95)
calculateQuantile(q: 0.99)


And task as I correctly understand from this log not fail because of time out.

1 Like

@va2dim,
Yah its no secret the Flux task engine isn’t great. Problems like this can be common. One workaroudn is to yes decrease the query duration. Another is to split the processing among several subsequent tasks…I know a headache. I would suggest using something like quix, mage, bytewax, etc for stream processing isntead. And write your logic in python.

1 Like