Perform calculation of correlation and write return value to bucket using task

Hello guys,

I am trying to create task that will run every 3 hours and it will calculate pearsonr correlation so far I got:

option task = {
    name: "correlation",
    every: 3h,
}

corr1 = from(bucket: "aggregates")
    |> range(start: -3h)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device_id"] == "1")
    |> filter(fn: (r) => r["device_name"] == "device_1")
    |> filter(fn: (r) => r["location"] == "ke")
    |> filter(fn: (r) => r["_field"] == "temperature")

corr2 = from(bucket: "aggregates")
    |> range(start: -3h)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device_id"] == "1")
    |> filter(fn: (r) => r["device_name"] == "device_1")
    |> filter(fn: (r) => r["location"] == "ke")
    |> filter(fn: (r) => r["_field"] == "pressure")

data = pearsonr(x: corr1, y: corr2, on: ["_time"])

data

This script will calculate and save result into data variable now I want to write this data return value into bucket called test.

I have edited the above script to:

option task = {
    name: "correlation",
    every: 3h,
}

corr1 = from(bucket: "aggregates")
    |> range(start: -3h)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device_id"] == "1")
    |> filter(fn: (r) => r["device_name"] == "device_1")
    |> filter(fn: (r) => r["location"] == "ke")
    |> filter(fn: (r) => r["_field"] == "temperature")

corr2 = from(bucket: "aggregates")
    |> range(start: -3h)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device_id"] == "1")
    |> filter(fn: (r) => r["device_name"] == "device_1")
    |> filter(fn: (r) => r["location"] == "ke")
    |> filter(fn: (r) => r["_field"] == "pressure")

data = pearsonr(x: corr1, y: corr2, on: ["_time"])

data
    |> to(
        bucket: "test",
        org: "**",
        token: "***",
        measurementColumn: "correlation",
        tagColumns: ["temp_press"],
        host: "***",
    )

This will result as an errror.

Downloaded data as csv is in this format and I do not understand why is there x, y for every entry:

,result,table,_field_x,_field_y,_measurement_x,_measurement_y,_start_x,_start_y,_stop_x,_stop_y,device_id_x,device_id_y,device_name_x,device_name_y,location_x,location_y,_value

I think it might be the root cause why it’s not working and I cannot see calculated data in new bucket called test.

Hello @D_B,
The reason why you don’t see new data in a new bucket is because your task doesn’t include the following code (but I think we’re saying the same thing):

data
    |> to(
        bucket: "test",
        org: "example-org",
    )

However:

Output data requirements
to() writes data structured using the standard InfluxDB Cloud and v2.x data structure that includes, at a minimum, the following columns:

  • _time
  • _measurement
  • _field
  • _value

So you must use the rename() function and include a _time() function to meet those output data requirements.
So you have to add something like

data 
    |> rename(columns: {_measurement_x: "_measurement", _field_x: "_field", _start: "_time"})

You can also use the drop() function to drop any columns you’re not interested in keeping in the result.
You can also use the map() function to set the _time column with the current time instead of setting the _start time as the _time:

    |> map(fn: (r) => ({r with _time: now()}))

Up to you!