Convert InfluxQL subqueries to Flux

I have converted this below query to flux but it does not give me the correct answer.
SELECT sum(“value”) FROM ( SELECT non_negative_derivative(mean(“log_end_offset”), 1s) AS “value” FROM “kafka_consumer_group” WHERE (“topic” =~ /^$topic$/ AND “resourcegroup” = ‘clelastic’ AND “client_id” ==vm-kafka-mirrormaker) AND $timeFilter GROUP BY time($__interval), “topic”, “partition” fill(none) ) GROUP BY time($__interval), “topic” fill(none)

from(bucket: “myvc”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “kafka_consumer_group”)
|> filter(fn: (r) => r[“_field”] == “log_end_offset”)
|> filter(fn: (r) => r[“topic”] == “to_metrics_shared” and r[“resourcegroup”] == “clelastic” and r[“client_id”] == vm-kafka-mirrormaker)
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> derivative(unit: 1s, nonNegative: true)
|> mean(column: “_value”)
|> group()
|> sum()
This flux query doesn’t provide me the exact result, can some please help me on this. ?

Hello @Murugan_P,
Welcome!
Thanks for your question. :slight_smile:
Hmm your query loooks good to me except for it looks like you have an extra mean. You’re already taking the mean with the aggregateWindow() function. I don’t see you taking the mean of the derivative with your influxQL query. Can you please try:

from(bucket: “myvc”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “kafka_consumer_group”)
|> filter(fn: (r) => r[“_field”] == “log_end_offset”)
|> filter(fn: (r) => r[“topic”] == “to_metrics_shared” and r[“resourcegroup”] == “clelastic” and r[“client_id”] == vm-kafka-mirrormaker)
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> derivative(unit: 1s, nonNegative: true)
|> group()
|> sum()

When making translations I recommend trying to get the subqueries right first and translating the queries bit by bit instead of making the translation all at once.

Thank you and its working as expected.

@Murugan_P
Awesome to hear! Thank you!
PS what are you using InfluxDB for? It really makes my day to learn about what the community is doing with InfluxDB.
Thank you!

I have one question, currently i have this querry.

from(bucket: “abc”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “mqtt_consumer” or r[“_measurement”] == “mqtt_float” or r[“_measurement”] == “mqtt_json” or r[“_measurement”] == “mqtt_string”)
|> filter(fn: (r) => r[“topic”] == “abc/0176/mission/waypoints” or r[“topic”] == “abc/069209ac-ce9d-4af0-b786-1f1d9e9d05e5/mission/destinations” or r[“topic”] == “abc/069209ac-ce9d-4af0-b786-1f1d9e9d05e5/mission/home_position”)
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: “mean”)

Currently the query is displaying the topics that i have selected in the influxdb dashboard. I want another query that displays all the topics in the certian time frame. I don’t have to type the exact topics in the query.