Write CSV data from my s3 bucket to InfluxDB-Cloud

Hi everyone,

i have a s3-bucket which generates csv-data every 5 minutes. I now want to send this data automatically to my aws managed influxdb-cloud.

Besides sending this data via telegraf + kinesis, is there any simpler solution to this kind of topic ?

Really looking forward to hear possible solutions.

Greetings
Wig

Since i don’t know the specific application, i can only give a few general hints:

  • Personally i would program an AWS Lambda function that is triggered by the S3 bucket and then writes this data to the InfluxDB instance.
  • What should Kinesis be used for?

Hi there Franky,

thx for the response. I d like to monitor different sensors (each one has its unique folder inside my s3 bucket) and see if they are working correctly.

Athena is used after i have a very high volume of data, a few months ago i read a article in which someone tried to send a huge amount of data which he generated inside his s3 bucket via the Amazon Kinesis Consumer Telegraf Plugin.

If you can somehow marry S3, Kinesis and Telegraf, that would also be a possibility. Unfortunately I don’t know Kinesis, I can’t help there, there are too many services that AWS offers to stay up to date there… :smirk:

there are too many services that AWS offers to stay up to date there… :smirk:

true that ^^ . Anyway thanks for your Input. Have a great week.

Cheers
Wig

@Seventeen In theory, you could do this with a Flux task and the S3 HTTP API, but I don’t know that I would call it “simpler”. Essentially you would just use an API request to grab the CSV and then convert into a stream of tables with csv.from(). This function has a raw mode that parses all columns as strings and the first row as column labels. This essentially lets you pull any CSV file into Flux, but you will need to type-cast columns to their appropriate data type. For example:

import "csv"
import "experimental"
import "experimental/http/requests"

response = requests.get(
    url: "https://example-s3-api-url.com",
    headers: ["Authorization": "..."],
    body: bytes(v: "example-request-body")
)

csvData = string(v: response.body)

csv.from(csv: csvData)
    |> map(fn: (r) => ({ r with
        _measurement: "example-measurement",
        _time: time(v: r.time),
        fieldColumn1: int(v: r.column1),
        fieldColumn2: float(v: r.column2),
        fieldColumn3: bool(v: r.column3),
        tagColumn1: r.tagColumn1,
        tagColumn2: r.tagColumn2,
        tagColumn3: r.tagColumn3,
)

Now, I’m guessing your data is “pivoted”, meaning each field has its own column per row (that’s generally how most CSV is structure). If that’s the case, you can use experimental.to() to write the data. With this function, the _measurement column is used as the measurement, all columns that are in the group key are written as tags, and all columns that are not part of the group key are written as fields. So you need to group the data by your measurement column and all your tag columns before writing it to InfluxDB. To continue from the Flux above:

// ...
    |> group(columns: ["_measurement", "tagColumn1", "tagColumn2", "tagColumn3"])
    |> experimental.to(bucket: "example-bucket")

Hi,

I can use the columns country,state,city in place of r.column1,r.column2 etc in the below code

csv.from(csv: csvData)
|> map(fn: (r) => ({ r with
_measurement: “Hydrotest”,
_time: time(v: r.time),
fieldColumn1: int(v: r.column1),
fieldColumn2: float(v: r.column2),
fieldColumn3: bool(v: r.column3),
tagColumn1: r.plant,
tagColumn2: r.state,
tagColumn3: r.tag,
)

Hi Scott, is there any way to write the stored data into a CSV file?
(with reference to this issue)

Thank you

@Mr_Influx Flux returns results in annotated CSV by default, so you can just use the InfluxDB query API and write the results to a file. By default, the returned CSV only contains column headers. If you want full annotations, you need to specify them in the dialect field of the query request:

curl --request POST \
  http://localhost:8086/api/v2/query?org=<INFLUX_ORG_NAME>  \
  --header 'Authorization: Token <INFLUX_TOKEN>' \
  --header 'Accept: application/csv' \
  --header 'Content-type: application/vnd.flux' \
  --data '{
    "query": "from(bucket: \"example-bucket\") |> range(start: -1d) |> ...",
    "dialect": {
        "annotations": [
            "group",
            "datatype",
            "default"
        ]
    }
}' > /path/to/target/file.csv
1 Like

It worked…Thank you.

PS: My response came a bit late as the RPI was on a remote place and it was disconnected from the network, I had to go and test it with an ethernet cable.

Thank you Scott