Yield Within a Reduce or Map Function

I am creating an event management system that is responsible for (1) detecting when data has arrived to Influx, (2) letting a client know there is data waiting, and (3) then allowing the client to query the data from the last data point that has been processed. Ive got the first two parts of this working, but am struggling to get the last part working.

I have built a “cache” using influx that keeps track of the measurements I am watching, and also the last timestamp that the client has processed the measurements. This allows the client to know where it left off (it wont miss data this way). This process uses the Tasks feature in influx and is working great.

My client will receive an http post from the Influx Task when there is data ready to be gathered, then when a timer in the client hits and there is data waiting the client will send a query to check the cache, gather the measurements and last timestamps needed to query and will attempt to query the data. This is the part I’m struggling with. Here is my current query:

GetCacheValues = (uniqueGUID) => 
from(bucket: "Cache")
|> range(start: 0)
|> filter(fn: (r) => r.UniqueGUID == uniqueGUID)

GetRawValues = (measurement, start) =>
from(bucket: "RawData")
|> range(start: time(v: start))
|> filter(fn: (r) => r._measurement == measurement)
|> filter(fn: (r) => r._field == "Value")
|> yield(name: measurement)    //THIS IS WHERE I WANT TO YIELD
|> findColumn(fn: (key) => key._measurement == measurement, column: "_value")

GetNewValues = (uniqueGUID) =>
GetCacheValues(uniqueGUID: uniqueGUID) 
|> filter(fn: (r) => r._field == "LastTimestampProcessed")
|> reduce(fn: (r, accumulator) => ({HasValues: if GetRawValues(measurement: r._measurement, start: r._value) |> length() > 0 then 1 else 0 }), identity: {HasValues: 0})

GetNewValues(uniqueGUID: "e3485e8a-2722-499e-a4b2-16171349116d")

The issue I’m running into is not being able to yield data inside of a reduce or map function. I am really just trying to iterate through all of the measurements I need to query, I don’t actually need to reduce anything, but am using it as a means of iterating through the cache flux table. The only reason I am turning the GetRawValues() function into an array after I am attempting to yield, is due to satisfying the reduce function by checking to see if there are values in the array.

Is there a better way to do what I’m trying to do? And also why is the yield(name: measurement) line not yielding anything? There is data coming back at that point.

Hello @ticchioned,
Are you getting any errors?
Unfortunately I don’t think you can use a conditional statement inside of reduce.
I don’t think there’s a way to force iterating over measurements. Flux doesn’t really support looping.
Also you’re not using the accumulator anywhere.

sampledata.int()
    |> reduce(
        fn: (r, accumulator) =>
            ({
                count: accumulator.count + 1,
                total: accumulator.total + r._value,
                avg: float(v: accumulator.total + r._value) / float(v: accumulator.count + 1),
            }),
        identity: {count: 0, total: 0, avg: 0.0},
    )

Which I believe you need to do for reduce to work.

You could group by _measurement and do that work and then pull the value from each measurement and the timestamp and then generate each query from those values, but this is a static/hard coded solution.
I’d honestly recommend using a client library for this type of work over Flux.

Surprisingly there are no errors and everything is working other than the yield statement. I’m using this approach in a different script and again, everything works fine with the conditionals and no accumulator usage. For example, in this script I’m just checking to see if new data has arrived to the database. The conditional returns true if there is data and false if there isn’t data and so far no issues with it!

HasNewValues = (uniqueGUID) =>
    GetCacheValues(uniqueGUID: uniqueGUID)
        |> reduce(
            fn: (r, accumulator) =>
                ({
                    HasValues:
                        if GetSnapshot(measurement: r._measurement, start: time(v: r._value))
                                |> length() > 0
                        then
                            true
                        else
                            false,
                }),
            identity: {HasValues: false},
        )
        |> filter(fn: (r) => r.HasValues == true)
        |> yield(name: uniqueGUID)
        |> findColumn(fn: (key) => key._field == "LastTimestamp", column: "HasValues")