Is it http.post()’s bug?

I try to use http.post() in flux task and My steps are as follows:

  1. Login in to influxdb on web
  2. Create schedule task which call http.post()
  3. Manual trigger the task
  4. App receive the http request successed
  5. Task triggered by influxdb allways are success, But the app can not receive the http request.

My question :
Manual success and received http request .
Influxdb schedule success but no http request received, Why?

Ps:

  1. example event data send by another mock app, so the task can read/get event data successfully everytime.

  2. Influxdb deployed in docker env.

  3. Task script content


import “http”

option task = {name: “testtask”, every: 1h}

from(bucket: “systembucket”)

|> range(start: -task.every)

|> filter(fn: (r) => (r._measurement == “du2”))
|> filter(fn: (r) => (r._value > 10000))

|> aggregateWindow(every: 24h, fn: mean, createEmpty: false)

|> limit(n: 1, offset: 0)

|> map(fn: (r) =>

({r with status_code: http.post(url: "http://localhost:8088/api/v2/notify", headers: {"Content-Type": "application/json", Authorization: "Token faketoken"}, data: bytes(v: "{\"start\": 2021-01-11T22:00:00Z, \"stop\": 2021-01-12T00:00:00Z}"))}))

@tfnick The best thing here to do to troubleshoot would be to write the results of your task back to InfluxDB. It will include the response code of each sent request and should at least give you an idea if the requests are succeeding or failing.

import "http"

option task = {name: "testtask", every: 1h}

from(bucket: "systembucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => (r._measurement == "du2"))
    |> filter(fn: (r) => (r._value > 10000))
    |> aggregateWindow(every: 24h, fn: mean, createEmpty: false)
    |> map(fn: (r) => ({r with
        status_code: http.post(url: "http://localhost:8088/api/v2/notify", headers: {"Content-Type": "application/json", Authorization: "Token faketoken"}, data: bytes(v: "{\"start\": 2021-01-11T22:00:00Z, \"stop\": 2021-01-12T00:00:00Z}")),
        _measurement: "notifications",
    }))
    |> to(bucket: "example-bucket")

If you want the task to fail if the http.post() returns a non-200 repsonse, you can conditionally execute die() in a map() call.

Note: die() currently isn’t documented, but it will be soon.

import "http"

option task = {name: "testtask", every: 1h}

from(bucket: "systembucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => (r._measurement == "du2"))
    |> filter(fn: (r) => (r._value > 10000))
    |> aggregateWindow(every: 24h, fn: mean, createEmpty: false)
    |> map(fn: (r) => ({r with
        status_code: http.post(url: "http://localhost:8088/api/v2/notify", headers: {"Content-Type": "application/json", Authorization: "Token faketoken"}, data: bytes(v: "{\"start\": 2021-01-11T22:00:00Z, \"stop\": 2021-01-12T00:00:00Z}")),
        _measurement: "notifications",
    }))
    |> map(fn: (r) => {
        check = if string(v: r.status_code) !~ /^2/ then die(msg: "http.post() received a ${r.status_code} status code") else r.status_code

        return { r with status_code: check }
    })

This will cause the task to fail and get logged to the InfluxDB task logs.

@scott thank u I will try

@scott die function cannt help me locate the proplem.

The complete script is as follows.

import “json”
import “http”

option task = {name: “example_1m_xxx”, every: 1m, offset: 1s}

myEndpoint = http.endpoint(url: “http://192.168.1.199:8888/collect/api/notify”)

from(bucket: “example”)
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == “cpu”)
|> aggregateWindow(every: task.every, fn: first, createEmpty: false)
|> duplicate(column: “_value”, as: “_value_price”)
|> derivative()
|> map(fn: (r) => ({r with _value: r._value / r._value_price}))
|> drop(columns: ["_value_price"])
|> group()
|> map(
fn: (r) => ({r with
jsonStr: string(
v: json.encode(
v: {
“_measurement”: r._measurement,
“exchange”: r.exchange,
“currency”: r.currency,
“_field”: r._field,
“_value”: r._value,
“_time”: r._time,
},
),
),
}),
)
|> myEndpoint(
mapFn: (r) => ({headers: {Authorization: “Bearer mySuPerSecRetTokEn”, “Content-type”: “application/json”}, data: bytes(v: r.jsonStr)}),
)()
|> map(
fn: (r) => {
check = if string(v: r.status_code) !~ /^2/ then die(msg: “http.post() received a ${r.status_code} status code”) else r.status_code

        return {r with status_code: check}
    },
)

@tfnick Ah, ok. I didn’t realize you were using http.endpoint(). That already appends the HTTP response code in the _sent column. Try this:

import "json"
import "http"

option task = {name: "example_1m_xxx", every: 1m, offset: 1s}

myEndpoint = http.endpoint(url: "http://192.168.1.199:8888/collect/api/notify")

from(bucket: "example")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: task.every, fn: first, createEmpty: false)
    |> duplicate(column: "_value", as: "_value_price")
    |> derivative()
    |> map(fn: (r) => ({r with _value: r._value / r._value_price}))
    |> drop(columns: ["_value_price"])
    |> group()
    |> map(
        fn: (r) => ({r with
            jsonStr: string(
                v: json.encode(
                    v: {
                        "_measurement": r._measurement,
                        "exchange": r.exchange,
                        "currency": r.currency,
                        "_field": r._field,
                        "_value": r._value,
                        "_time": r._time,
                    },
                ),
            ),
        }),
    )
    |> myEndpoint(mapFn: (r) => ({headers: {Authorization: "Bearer mySuPerSecRetTokEn", "Content-type": "application/json"}, data: bytes(v: r.jsonStr)}))()
    |> map(
        fn: (r) => {
            check = if r._sent !~ /^2/ then die(msg: "http.post() received a ${r._sent} status code") else r._sent

            return {r with _sent: check}
        },
    )