Help aggregating and writing to new table

Hi,
I’m migrating from Influx 1 to influx 2 and trying to convert my continuous queries that aggregate/reduce the raw data using tasks and Flux.

I almost got it working, but in the last step to write the aggregated data back to a table, somehow one of the fields is not written back.

As probably the flux-query can also be improved I’ll describe the full use case:

The source table contains energy measurements from my solar panels. I have dashboards to look at all the details of the raw data, but after a week most of it is no longer interesting and can be compacted massively.

Basically the only thing to preserve is the Kwh meter data. This is just a simple increasing counter value that is sampled every 10 seconds.

What I want to do is to reduce the data with a daily task that writes into a new table the last meter reading of each day and also the total daily usage (= for each day: meter end value - start value).

So to do this for e.g. the last 7 days, the query I came up that seems to do the job is:

last = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "Import")
    |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
first = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "Import")
    |> aggregateWindow(every: 1d, fn: first, createEmpty: false)

join(tables: {lastusage: last, firstusage: first}, on: ["_time", "_stop", "_start", "host", "_field", "_measurement","topic"])
    |> map(fn: (r) => ({_value: r._value_lastusage - r._value_firstusage,
     _meter_endvalue : r._value_lastusage,
     _time: r._time   
    }))
    |> set(key: "_measurement", value: "solar_daily")
    |> set(key: "_field", value: "day_usage")

This output look exactly as I would like, having the “_time”, “day_usage” and “_meter_endvalue” columns nicely populated.

So, to write this to a daily table I have added this to the the above query:

   |> to(bucket: "test_daily", org: "my-org", tagColumns: [""])

I now would expect the “test_daily” table to have the exact contents as the query results from above.
However, I’m missing the “meter_endvalue” column:

Why is the last column missing, what am I doing wrong here?

Hi @MarcoB,
Welcome to the community! If you remove tagColumns from the to() function what result do you see?

   |> to(bucket: "test_daily", org: "my-org")

Since you are adding an extra column it’s technically no longer a field. So when you specify an empty tag last the column is being dropped. Check out this example here: Custom to() operation

You have a few options on how to deal with the column

Thanks!
I did remove the tagColumns but that doesn’t seem to make a difference.
What do you mean with “Since you are adding an extra column it’s technically no longer a field”?

I did make the query a bit simpler:

last = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "modbus/sdm1-2/Import")
    |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
first = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "modbus/sdm1-2/Import")
    |> aggregateWindow(every: 1d, fn: first, createEmpty: false)

join(
    tables: {lastusage: last, firstusage: first},
    on: [
        "_time",
        "topic",
    ],
)
    |> map(
        fn: (r) => ({
            _value: r._value_lastusage - r._value_firstusage,
            _endvalue: r._value_lastusage,
            _time: r._time,
            _measurement: "solar_daily",
            _field: "day_usage"
        }),
    )

Interactively this exactly shows what I want:

So I would expect that after writing this to a table like this, that would be a one to one copy:

|> to(
  bucket:"testtest",
  org:"my-org",
) 

Unfortunately if I look in the “testtest” bucket I still miss the _endvalue column.

I do note however that this bucket behaves strange: I get a “nog tag keys found” when using the Query Builder of the Influx 2 GUI, but it does give me the result above using:

from(bucket: "testtest")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "solar_daily")

So It looks like I should tags (but I already have _measurement and _field columns filled wit tags?)?

Update:
I now have found a solution by experimenting with the experimental.to function.
Having changed the query to:

last = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "Import")
    |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
first = from(bucket: "testtelegraf")
    |> range(start: -7d)
    |> filter(fn: (r) => r["topic"] == "Import")
    |> aggregateWindow(every: 1d, fn: first, createEmpty: false)

join(
    tables: {lastusage: last, firstusage: first},
    on: [
        "_time",
        "topic",
    ],
)
    |> map(
        fn: (r) => ({
            _value: r._value_lastusage - r._value_firstusage,
            _endvalue: r._value_lastusage,
            _time: r._time,
            _measurement: "solar_daily",
        }),
    )
    |> to(
  bucket:"test_daily",
  org:"my-org",
  fieldFn: (r) => ({"_value": r._value, "_endvalue": r._endvalue})
)

Now results in basically 2 tables on the test_daily bucket:

Apparently the _field is now tagged with “_value” or “_endvalue” according to which value there is in the “_value” field.

So,I now have saved both the daily usage and the end-value in tables, and can use that in reports, so that is fine.

Still puzzled why I can’t see to create a single table with both values in each row, just like the result of the initial query. If somebody would show me how to do that (and how I seem not to understand to work with fields and tags), that would be great!

Hi @MarcoB,
In my opinion, you are treating both values correctly. They both should exist as fields within your new bucket. You can then perform a pivot() to align your field data in columns like you have previously based on time:

pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Note: unlike a SQL database in the times series table structure a new column = metadata = tag. Since all rows are bound to _time I do not recommend making a generated numeric value a tag. If you still want to take this approach then here is how you go about it:
Note: that all tags are stored as strings so need to be converted as such:

    |> map(
        fn: (r) => ({
            _value: r._value_lastusage - r._value_firstusage,
            _endvalue: string(r._value_lastusage),
            _time: r._time,
            _measurement: "solar_daily",
        }),
    )
    |> to(
  bucket:"test_daily",
  org:"my-org",
  fieldFn: (r) => ({"_value": r._value}),
  tag: ["_endvalue"] 

)

Though be careful as this can lead to high cardinality.

Hi @Jay_Clifford,

Thanks so much for your patience!
Yes, the Pivot does indeed make this a single table again, nice.

It’s just that I was trying to have the data also stored like this, thinking (probably old-school SQL) to try to “normalize” the data and not having 2 tables with “redundant” timestamps, and “_measurement” columns.

The key thing that is new for me is that you said:

Note: unlike a SQL database in the times series table structure a new column = metadata = tag.

So, what you are saying is that in time series databases you don’t store multiple value’s (sharing the same tags) in a single row, but always “pivot” the data on-the-fly using a query?
Good to know, voor a time series noob :wink: !

Hi @MarcoB,
No worries at all and happy to help. So although what you see is two tables with equivalent timestamps both fields are bound to the same timestamp as that represents their key. They are considered part of the same series. Pivoting as you point out allows you to normalize your data series representation making it far easier to read and apply calculations. Note this does not change the underlining structure of the data. To learn more about the data schema within InfluxDB I highly recommend checking out: