Taking historical data from SQL and write to influxdb bucket

Hello
I’m trying to make task for taking historical data from SQL and write to influxdb bucket.
Like a taking table data for each 1min.

Below flux works okay. but I think should have better flux.
Do you have any recommend flux?

import "sql"

test = sql.from(
    driverName: "mysql",
    dataSourceName: "usradmin:passwd@tcp(localhost:3306)/prod",
    query: "SELECT PJ AS PJ,task AS task FROM prod.linkdata",
)

test2 = test

test
    |>map(fn: (r) => ({_time: now(), _measurement:"test", _field: "PJ", _value: r.PJ}))
    //|>yield()
    |> to(bucket: "mariadbtest")

test2
    |>map(fn: (r) => ({_time: now(), _measurement:"test", _field: "task", _value: r.task}))
    //|>yield()
    |> to(bucket: "mariadbtest")

I do this exact process using Node RED. Works well and is easy to use.

@grant1 Thank you for your suggestion.
Node RED also looks good way to take data from SQL.
I am in struggle for transform sql data to influxdb with multiple object.
Are you using “function” for transform data or using “change”?

@grant1 Hello again.
I find the way to transform object.
For the moment, I can use “split” sequence.
by the way, I notice above my flux only display last data row.
I will try to find how to solved.

@Yutaka

Sorry for the late reply.

Here is my flow in Node-RED:

  1. In the MSSQL-PLUS node, I run a SELECT query that returns the measurements (called MeasuredValue) and timestamps (in epoch format, called DateTimeTaken).
  2. In the function node, I put this:
msg.payload = msg.payload.map(function(value) {
    return {
        measurement: "MyMeasurementData",
        fields: {
            temperature:value.MeasuredValue,
        },
        tags:{
            MeasType:"actual",
            EquipNumber:"305"
        },
        timestamp: value.DateTimeTaken
    }
});
return msg;

@grant1
Thank you for your reply.
I’m trying to use your method.
Debug output looks good. but I have error message.
I will continue to find out reason.


Current version
Nord-red ver. 3.0.2
node-red-contrib-influxdb 0.6.1
node-red-node-mysql 1.0.3

Place the debug node after the MySQL node and post the output here.

@Yutaka It could be simplified a little. You can actually use influxdb.wideTo() to write pivoted data back to InfluxDB. So you don’t need separate variables to structure each field. The following example just takes the queried SQL data and appends two columns to each returned row: _time and _measurement.

import "sql"
import "influxdata/influxdb"

sql.from(
    driverName: "mysql",
    dataSourceName: "usradmin:passwd@tcp(localhost:3306)/prod",
    query: "SELECT PJ AS PJ,task AS task FROM prod.linkdata",
)
    |> map(fn: (r) => ({r with _time: now(), _measurement: "test"}))
    |> influxdb.wideTo(bucket: "mariadbtest")

@grant1
Thanks again.
I hope that enough information for you.

image

@Yutaka

It looks like this error message is complaining about the boolean.
image

The fields PJ and task appear to be strings (“AB1234” and “C12587” in the first object). Shouldn’t these be integers or floats?

@scott
Thank you about informaion.
When I try to use this method following error apper.
I tried few things as on the comment section in flux.
but I have no luck.
Do you have any suggestion?

required column "_measurement" not in group key

image

View Raw Data before wideto()

@grant1
I confirmed data type of db using VARCHAR, I tested by TEXT. also same message.
Just in case recreated bucket.

following code will be accepetd by influxdb.
but tags will be lost.

msg.payload = msg.payload.map(function (value) {
   return {
       PJ: value.PJ,
       task: value.task
   }
});
return msg;

image

it looks array structure involved.

Hi @Yutaka

following code will be accepetd by influxdb.
but tags will be lost.

Did your MySQL data use tags? My example (“305” and “actual”) were from my data, so if you do not have any tags to import, I think you are good to go.

Did you successfully import your MySQL data into InfluxDB 2.5 using Node-RED?

@Yutaka Oh, sorry about that. You do need to group by measurement. In your commented out group() call, you’re using the except mode. This should work:

import "sql"
import "influxdata/influxdb"

sql.from(
    driverName: "mysql",
    dataSourceName: "usradmin:passwd@tcp(localhost:3306)/prod",
    query: "SELECT PJ AS PJ,task AS task FROM prod.linkdata",
)
    |> map(fn: (r) => ({r with _time: now(), _measurement: "test"}))
    |> group(columns: ["_measurement"])
    |> influxdb.wideTo(bucket: "mariadbtest")

Hello @scott
Sorry. I made misunderstood of error message.
Group works fine now. :grinning:
But I notice after apply above flux, when I check bucket only display last data row.
Is there any way to writing all data from SQL?

Hello @grant1

I must say sorry first.
I was using “influxdb out” for storage.
When I use “influxdb batch”. your function node worked.
But, when I check influxdb bucket only display last data row.


image

Using “influxdb out” for storage and split.
It will receive all row.

I wish to use your method, but current my purpose can use split method.

Are your sure your SQL query is returning more than one row? It looks like you’re having the same issue with NodeRed, so I’m thinking it’s the SQL query that is returning only the last row.

Hello

I believe SQL should be fine.
I can see all row after apply flux.

below screenshot after Query Bucket :thinking:

Ah, I see. InfluxDB uniquely identifies points by their time stamp, tag set, and field key. If you right a point with the same timestamp, tag set, and field of a point that has already been written, InfluxDB will update the existing data with the more recent data.

All the points you’re writing have the same time stamp, tag set, and field keys. So InfluxDB is writing them all, but is immediately overwriting the other points because it sees them as the same point, just with updated data.

So one approach would be to update the timestamp of each row incrementally. This get’s a little tricky, but here’s an example:

import "date"
import "sql"
import "influxdata/influxdb"

sql.from(
    driverName: "mysql",
    dataSourceName: "usradmin:passwd@tcp(localhost:3306)/prod",
    query: "SELECT PJ AS PJ,task AS task FROM prod.linkdata",
)
    |> map(fn: (r) => ({r with _time: now(), _measurement: "test"}))
    |> group(columns: ["_measurement"])
    |> map(fn: (r) => ({r with index: 1}))
    |> cumulativeSum(column: "index")
    |> map(fn: (r) => ({r with _time: date.add(d: date.scale(d: 10s, n: r.index), to: r._time)}))
    |> influxdb.wideTo(bucket: "mariadbtest")

@scott
Thank you about it.

I have error message.

found unexpected argument column (Expected `columns`)

I changed it.

|> cumulativeSum(columns: ["index"])

after 30sec later 3 rows appear.
I will tune up data.scale, and I can able monitor historical data.

Thank you very much again. :pray: