Hi there!
I am trying to write a Flux task that runs same query multiple times with different regex variable:
import "experimental/array"
option task = {name: "test", every: 10s}
containers = [
{regex: /(?i)access-service/},
{regex: /(?i)lns-service/},
{regex: /(?i)identity-service/},
]
containers
|> array.map(
fn: (x) =>
from(bucket: "test-bucket")
|> range(start: -20s)
|> filter(fn: (r) => r["_measurement"] == "docker_container_status")
|> filter(fn: (r) => r["container_name"] =~ x.regex)
|> filter(fn: (r) => r["_field"] == "uptime_ns")
|> filter(fn: (r) => r["_value"] >= 2000000000)
|> last()
|> keep(columns: ["_value"])
|> bottom(n: 1)
|> map(
fn: (r) =>
({r with _measurement: "container_uptime",
_value: 20,
_time: now(),
_field: "access-service",
}),
)
|> to(bucket: "node-bucket"),
)
However, above query fails!
Only other solution is to write separate tasks per docker swarm service I need to track
Please help ;-;
scott
November 28, 2023, 10:23pm
2
Yeah, Flux doesn’t do well at creating and handling arrays of streams. It’s a known issue that I’ve been hoping would be fixed for a while.
In your case, does the containers
array change with each execution or is it static? If it’s static, for brevity, you can define a function that takes a regex as input and returns a stream of tables. Then use union()
to union all the different streams together into a single stream and write them back to InfluxDB:
option task = {name: "test", every: 10s}
getResults = (regex) =>
from(bucket: "test-bucket")
|> range(start: -20s)
|> filter(fn: (r) => r["_measurement"] == "docker_container_status")
|> filter(fn: (r) => r["container_name"] =~ regex)
|> filter(fn: (r) => r["_field"] == "uptime_ns")
|> filter(fn: (r) => r["_value"] >= 2000000000)
|> last()
|> keep(columns: ["_value"])
|> bottom(n: 1)
|> map(
fn: (r) =>
({r with _measurement: "container_uptime",
_value: 20,
_time: now(),
_field: "access-service",
}),
)
union(tables: [
getResults(regex=/(?i)access-service/),
getResults(regex=/(?i)lns-service/),
getResults(regex=/(?i)identity-service/)
])
|> to(bucket: "node-bucket"),
Thank you for the reply!
The container array is static
I’ve tried copy and pasting your code into the Influx DB Task. However, following error pops up:
Failed to update task : Invalid flux script. Please check your query text.
scott
November 28, 2023, 10:45pm
4
Oh, I left a comma after the to()
call. Try removing that.
option task = {name: "test", every: 10s}
getResults = (regex) =>
from(bucket: "test-bucket")
|> range(start: -20s)
|> filter(fn: (r) => r["_measurement"] == "docker_container_status")
|> filter(fn: (r) => r["container_name"] =~ regex)
|> filter(fn: (r) => r["_field"] == "uptime_ns")
|> filter(fn: (r) => r["_value"] >= 2000000000)
|> last()
|> keep(columns: ["_value"])
|> bottom(n: 1)
|> map(
fn: (r) =>
({r with _measurement: "container_uptime",
_value: 20,
_time: now(),
_field: "access-service",
}),
)
union(tables: [
getResults(regex=/(?i)access-service/),
getResults(regex=/(?i)lns-service/),
getResults(regex=/(?i)identity-service/)
])
|> to(bucket: "node-bucket")
Still same error
I was able to save after commenting out lines 22 and onwards
Not sure why Task does not like the union function
Fixed! replacing “=” with “:” resolved the issue! Thank you
option task = {name: "test", every: 10s}
getResults = (regex) =>
from(bucket: "test-bucket")
|> range(start: -20s)
|> filter(fn: (r) => r["_measurement"] == "docker_container_status")
|> filter(fn: (r) => r["container_name"] =~ regex)
|> filter(fn: (r) => r["_field"] == "uptime_ns")
|> filter(fn: (r) => r["_value"] >= 2000000000)
|> last()
|> keep(columns: ["_value"])
|> bottom(n: 1)
|> map(
fn: (r) =>
({r with _measurement: "container_uptime",
_value: 20,
_time: now(),
_field: "access-service",
}),
)
union(
tables: [
getResults(regex: /(?i)access-service/),
getResults(regex: /(?i)lns-service/),
getResults(regex: /(?i)identity-service/),
],
)
|> to(bucket: "node-bucket")
scott
November 28, 2023, 11:03pm
7
Copy pasta error. Sorry about that.