Influxdb 2.0 Inactive Tasks Running when created by influxdb-client-java

Hi,

I use influxDB 2.0.7 and influxdb-client-java (GitHub - influxdata/influxdb-client-java: InfluxDB 2 JVM Based Clients)

I tried to create a new Task from java

InfluxDBClient influxDBClient = InfluxDBClientFactory.create(URL, token.toCharArray(), org);

       Task t = new Task();
        t.setFlux(flux);
        t.setOrgID(orgID);

        if(checked) {
            t.setStatus(TaskStatusType.ACTIVE);
        }else {
            t.setStatus(TaskStatusType.INACTIVE);
        }

     Task task =  influxDBClient.getTasksApi().createTask(t);

       userEventConf.setTaskId(task.getId());
       dataManager.commit(userEventConf);

        influxDBClient.close();

I have notice that the new task is created in influxdb as inactive but when i check the logs i can see that is running.

When i set the task as active from influxdb ui and after i set it back to inactive then stop.

@bednar is this a known issue? thank you!

Hi @Geo_Gkez,

I am not able to reproduce your problem. I’ve create Task by:

String flux = "option task = {\n"
        + "    name: \"it task1623433845264-IT\",\n"
        + "    every: 1h,\n"
        + "    offset: 30m\n"
        + "}\n\n" 
        + "from(bucket:\"my-bucket\") |> range(start: -1m) |> last()";

Task t = new Task();
t.setFlux(flux);
t.setOrgID(organization.getId());
t.setStatus(TaskStatusType.INACTIVE);

Task created = influxDBClient.getTasksApi().createTask(t);
System.out.println("created = " + created);

and everything looks fine:

btw the Last completed at means the time when was Task created.

Regards

thanks for reply @bednar,

I tried to create a similar to your task and it works, but when i use a more complicated task then the problem is appeared.

I don’t know if it is the right way to create a custom check but i try to follow the docs create custom checks

The generated task script works as i expected.

for example I use the same way to create a Task with the flux:

        String flux = "import \"influxdata/influxdb/monitor\"\n" +
                "import \"influxdata/influxdb/schema\"\n" +
                "import \"contrib/tomhollingworth/events\"\n" +
                "import \"math\"\n" +
                "\n" +
                "option v = {bucket: \"\", timeRangeStop: now()}\n" +
                "option task = {name: \"test-if\", every: 2m}\n" +
                "\n" +
                "RPM_MCR = 105.0\n" +
                "Power_MCR = 13560.0\n" +
                "  val1 = from(bucket: \"B_7637494\")\n" +
                "\t|> range(start: -10m, stop: v.timeRangeStop)\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_measurement\"] == \"iot_data\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"sid\"] == \"7637494\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_field\"] == \"00020102\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_value\"] < 0.0 ))\n" +
                "\t|> count()\n" +
                "\t|> map(fn: (r) =>\n" +
                "\t\t({r with \"condition\": if r._value == 5 then 1 else 0}))\n" +
                "\t|> keep(columns: [\"condition\"])\n" +
                "\t|> toFloat() val7 = from(bucket: \"B_7637494\")\n" +
                "\t|> range(start: -10m, stop: v.timeRangeStop)\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_measurement\"] == \"iot_data\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"sid\"] == \"7637494\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_field\"] == \"0101014C\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_value\"] < 50.0 ))\n" +
                "\t|> count()\n" +
                "\t|> map(fn: (r) =>\n" +
                "\t\t({r with \"condition\": if r._value == 5 then 1 else 0}))\n" +
                "\t|> keep(columns: [\"condition\"])\n" +
                "\t|> toFloat() con_1 =val1  con_2 =val7 con_and = union(tables: [con_1,con_2])\n" +
                "\t|> sum(column: \"condition\")\n" +
                "\t|> map(fn: (r) =>\n" +
                "\t\t({r with \"condition\": if r.condition == 2 then 1 else 0, \"_time\": now()}))\n" +
                "notification = from(bucket: \"_monitoring\")\n" +
                "\t|> range(start: -1h, stop: v.timeRangeStop)\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_check_id\"] == \"73ac7e02-8818-01\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_check_name\"] == \"test-if\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_measurement\"] == \"notifications\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_level\"] == \"crit\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_sent\"] == \"true\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_field\"] == \"condition\"))\n" +
                "\t|> filter(fn: (r) =>\n" +
                "\t\t(r[\"_value\"] == 1))\n" +
                "\t|> last()\n" +
                "\t|> keep(columns: [\"_value\", \"_time\"])\n" +
                "\t|> rename(columns: {\"_value\": \"condition\"})\n" +
                "note = union(tables: [notification, con_and])\n" +
                "final_check = note\n" +
                "\t|> group(columns: [\"_measurement\"])\n" +
                "\t|> events.duration(unit: 1s, stop: now())\n" +
                "\t|> sort(columns: [\"_time\"])\n" +
                "\t|> first(column: \"_time\")\n" +
                "\t|> map(fn: (r) =>\n" +
                "\t\t({r with \"condition\": if r.duration > 480 or r.duration == 0 then 1 else 0}))\n" +
                "\t|> map(fn: (r) =>\n" +
                "\t\t({r with \n" +
                "\t\t\t\"_field\": \"notification\",\n" +
                "\t\t\t\"_measurement\": \"iot_event\",\n" +
                "\t\t\t\"sid\": \"7637494\",\n" +
                "\t\t\t\"_value\": 1,\n" +
                "\t\t}))\n" +
                "\t|> group(columns: [\"_measurement\"])\n" +
                "check = {\n" +
                "\t_check_id: \"73ac7e02-8818-01\",\n" +
                "\t_check_name: \"test-if\",\n" +
                "\t_type: \"custom\",\n" +
                "\ttags: {},\n" +
                "}\n" +
                "ok = (r) =>\n" +
                "\t(r[\"condition\"] == 0)\n" +
                "crit = (r) =>\n" +
                "\t(r[\"condition\"] == 1)\n" +
                "messageFn = (r) =>\n" +
                "\t(\"test-if\")\n" +
                "\n" +
                "final_check\n" +
                "\t|> monitor[\"check\"](\n" +
                "\t\tdata: check,\n" +
                "\t\tmessageFn: messageFn,\n" +
                "\t\tok: ok,\n" +
                "\t\tcrit: crit,\n" +
                "\t)";

and the influxdb correct create the task with the script:

import "influxdata/influxdb/monitor"

import "influxdata/influxdb/schema"

import "contrib/tomhollingworth/events"

import "math"

option v = {bucket: "", timeRangeStop: now()}

option task = {name: "test-if", every: 2m}

RPM_MCR = 105.0

Power_MCR = 13560.0

  val1 = from(bucket: "B_7637494")

    |> range(start: -10m, stop: v.timeRangeStop)

    |> filter(fn: (r) =>

        (r["_measurement"] == "iot_data"))

    |> filter(fn: (r) =>

        (r["sid"] == "7637494"))

    |> filter(fn: (r) =>

        (r["_field"] == "00020102"))

    |> filter(fn: (r) =>

        (r["_value"] < 0.0 ))

    |> count()

    |> map(fn: (r) =>

        ({r with "condition": if r._value == 5 then 1 else 0}))

    |> keep(columns: ["condition"])

    |> toFloat() val7 = from(bucket: "B_7637494")

    |> range(start: -10m, stop: v.timeRangeStop)

    |> filter(fn: (r) =>

        (r["_measurement"] == "iot_data"))

    |> filter(fn: (r) =>

        (r["sid"] == "7637494"))

    |> filter(fn: (r) =>

        (r["_field"] == "0101014C"))

    |> filter(fn: (r) =>

        (r["_value"] < 50.0 ))

    |> count()

    |> map(fn: (r) =>

        ({r with "condition": if r._value == 5 then 1 else 0}))

    |> keep(columns: ["condition"])

    |> toFloat() con_1 =val1  con_2 =val7 con_and = union(tables: [con_1,con_2])

    |> sum(column: "condition")

    |> map(fn: (r) =>

        ({r with "condition": if r.condition == 2 then 1 else 0, "_time": now()}))

notification = from(bucket: "_monitoring")

    |> range(start: -1h, stop: v.timeRangeStop)

    |> filter(fn: (r) =>

        (r["_check_id"] == "73ac7e02-8818-01"))

    |> filter(fn: (r) =>

        (r["_check_name"] == "test-if"))

    |> filter(fn: (r) =>

        (r["_measurement"] == "notifications"))

    |> filter(fn: (r) =>

        (r["_level"] == "crit"))

    |> filter(fn: (r) =>

        (r["_sent"] == "true"))

    |> filter(fn: (r) =>

        (r["_field"] == "condition"))

    |> filter(fn: (r) =>

        (r["_value"] == 1))

    |> last()

    |> keep(columns: ["_value", "_time"])

    |> rename(columns: {"_value": "condition"})

note = union(tables: [notification, con_and])

final_check = note

    |> group(columns: ["_measurement"])

    |> events.duration(unit: 1s, stop: now())

    |> sort(columns: ["_time"])

    |> first(column: "_time")

    |> map(fn: (r) =>

        ({r with "condition": if r.duration > 480 or r.duration == 0 then 1 else 0}))

    |> map(fn: (r) =>

        ({r with 

            "_field": "notification",

            "_measurement": "iot_event",

            "sid": "7637494",

            "_value": 1,

        }))

    |> group(columns: ["_measurement"])

check = {

    _check_id: "73ac7e02-8818-01",

    _check_name: "test-if",

    _type: "custom",

    tags: {},

}

ok = (r) =>

    (r["condition"] == 0)

crit = (r) =>

    (r["condition"] == 1)

messageFn = (r) =>

    ("test-if")

final_check

    |> monitor["check"](

        data: check,

        messageFn: messageFn,

        ok: ok,

        crit: crit,

    )

@Geo_Gkez thanks for detail info. If I use your code, then the task is also running:

@Anaisdg The client correctly create inactive Task, but the server try to run this task. It looks like server issue.

Do you have any news about this issue?

Dear all, let us know if you have been able to identify and resolve this issue. Thanx.