Nested query using array

Hi there!

I am trying to write a Flux task that runs same query multiple times with different regex variable:

import "experimental/array"

option task = {name: "test", every: 10s}

containers = [
    {regex: /(?i)access-service/},
    {regex: /(?i)lns-service/},
    {regex: /(?i)identity-service/},
]

containers
    |> array.map(
        fn: (x) =>
            from(bucket: "test-bucket")
                |> range(start: -20s)
                |> filter(fn: (r) => r["_measurement"] == "docker_container_status")
                |> filter(fn: (r) => r["container_name"] =~ x.regex)
                |> filter(fn: (r) => r["_field"] == "uptime_ns")
                |> filter(fn: (r) => r["_value"] >= 2000000000)
                |> last()
                |> keep(columns: ["_value"])
                |> bottom(n: 1)
                |> map(
                    fn: (r) =>
                        ({r with _measurement: "container_uptime",
                            _value: 20,
                            _time: now(),
                            _field: "access-service",
                        }),
                )
                |> to(bucket: "node-bucket"),
    )

However, above query fails!

Only other solution is to write separate tasks per docker swarm service I need to track
Please help ;-;

Yeah, Flux doesn’t do well at creating and handling arrays of streams. It’s a known issue that I’ve been hoping would be fixed for a while.

In your case, does the containers array change with each execution or is it static? If it’s static, for brevity, you can define a function that takes a regex as input and returns a stream of tables. Then use union() to union all the different streams together into a single stream and write them back to InfluxDB:

option task = {name: "test", every: 10s}

getResults = (regex) =>
    from(bucket: "test-bucket")
        |> range(start: -20s)
        |> filter(fn: (r) => r["_measurement"] == "docker_container_status")
        |> filter(fn: (r) => r["container_name"] =~ regex)
        |> filter(fn: (r) => r["_field"] == "uptime_ns")
        |> filter(fn: (r) => r["_value"] >= 2000000000)
        |> last()
        |> keep(columns: ["_value"])
        |> bottom(n: 1)
        |> map(
            fn: (r) =>
                ({r with _measurement: "container_uptime",
                    _value: 20,
                    _time: now(),
                     _field: "access-service",
                }),
        )

union(tables: [
    getResults(regex=/(?i)access-service/),
    getResults(regex=/(?i)lns-service/),
    getResults(regex=/(?i)identity-service/)
])
    |> to(bucket: "node-bucket"),

Thank you for the reply!

The container array is static

I’ve tried copy and pasting your code into the Influx DB Task. However, following error pops up:

Failed to update task : Invalid flux script. Please check your query text.

Oh, I left a comma after the to() call. Try removing that.

option task = {name: "test", every: 10s}

getResults = (regex) =>
    from(bucket: "test-bucket")
        |> range(start: -20s)
        |> filter(fn: (r) => r["_measurement"] == "docker_container_status")
        |> filter(fn: (r) => r["container_name"] =~ regex)
        |> filter(fn: (r) => r["_field"] == "uptime_ns")
        |> filter(fn: (r) => r["_value"] >= 2000000000)
        |> last()
        |> keep(columns: ["_value"])
        |> bottom(n: 1)
        |> map(
            fn: (r) =>
                ({r with _measurement: "container_uptime",
                    _value: 20,
                    _time: now(),
                     _field: "access-service",
                }),
        )

union(tables: [
    getResults(regex=/(?i)access-service/),
    getResults(regex=/(?i)lns-service/),
    getResults(regex=/(?i)identity-service/)
])
    |> to(bucket: "node-bucket")

Still same error
I was able to save after commenting out lines 22 and onwards
Not sure why Task does not like the union function

Fixed! replacing “=” with “:” resolved the issue! Thank you

option task = {name: "test", every: 10s}

getResults = (regex) =>
    from(bucket: "test-bucket")
        |> range(start: -20s)
        |> filter(fn: (r) => r["_measurement"] == "docker_container_status")
        |> filter(fn: (r) => r["container_name"] =~ regex)
        |> filter(fn: (r) => r["_field"] == "uptime_ns")
        |> filter(fn: (r) => r["_value"] >= 2000000000)
        |> last()
        |> keep(columns: ["_value"])
        |> bottom(n: 1)
        |> map(
            fn: (r) =>
                ({r with _measurement: "container_uptime",
                    _value: 20,
                    _time: now(),
                    _field: "access-service",
                }),
        )

union(
    tables: [
        getResults(regex: /(?i)access-service/),
        getResults(regex: /(?i)lns-service/),
        getResults(regex: /(?i)identity-service/),
    ],
)
    |> to(bucket: "node-bucket")

:man_facepalming: Copy pasta error. Sorry about that.