Write JSON with python into InfluxDB v2

Hi,
I’m collecting data from some temperature and humidy sensors. I saw some examples where python is writing a JSON format into a InfluxDB v1 database.

— SNIP —
# Write Dictionary to InfluxDB
json_body = [
{
‘measurement’: measurement,
‘fields’: rd
}
]
try:
db.write_points(json_body)

— SNAP —

But I’m still failing to write a JSON format with the new API syntax.

— SNIP —
# Write Dictionary to InfluxDB
json_payload =
data = {
“measurement”: measurement,
“tags”: {
“Ort”: “Rxxxx”
},
“time”: datetime.now(),
“fields”: rd
}
json_payload.append(data)
try:
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket, org, data)
— SNAP —

Is there perhaps a solution for this?

Kind Regards

Hi @carsten-re,

thanks for using our client.

Here is an example how to write dictionary style object: Migration Guide — influxdb_client 1.40.0dev0 documentation.

But I’m still failing to write a JSON format with the new API syntax.

Can you share a stacktrace of your error?

— SNIP —
json_payload =
data = {
“measurement”: measurement,
“tags”: {
“Ort”: “Rxxxx”
},
“time”: datetime.now(),
“fields”: rd
}
json_payload.append(data)
try:
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket, org, data)
— SNAP —

Is there perhaps a solution for this?
[/quote]

How looks like the rd object?

Regards

Sure:

Here is the JSON payload:

{'measurement': 'Messungen', 'tags': {'Ort': 'Rxxxx'}, 'fields': {'temp1': 2.6, 'hum1': 66.0, 'temp2': 2.3, 'hum2': 81.0, 'temp3': 22.0, 'hum3': 32.0}}

And here is the error (exit with CTRL+C):

Exception ignored in: <function WriteApi.__del__ at 0xffff8c7bcb80>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 404, in __del__
    if self._subject:
AttributeError: 'WriteApi' object has no attribute '_subject'
^CTraceback (most recent call last):
  File "./influxDBv2.py", line 31, in <module>
    serData = ser.readline()
  File "/usr/local/lib/python3.8/dist-packages/serial/serialposix.py", line 565, in read
    ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout.time_left())
KeyboardInterrupt

It looks like that you are kill the script before finish.

I’ve try your data by:

import json

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:
    with client.write_api(write_options=SYNCHRONOUS) as write_api:
        loaded = json.loads(
            '{"measurement": "Messungen", "tags": {"Ort": "Rxxxx"}, "fields": {"temp1": 2.6, "hum1": 66.0, "temp2": 2.3, "hum2": 81.0, "temp3": 22.0, "hum3": 32.0}}')

        write_api.write(bucket="my-bucket", record=loaded)

and everything works correctly.

Regards

1 Like

Yes, you’re right, I have stopped the script with CTRL+C after the error occured.

But even with your last example, I ran into this message:

Traceback (most recent call last):
  File "./influxDBv2.py", line 64, in <module>
    with client.write_api(write_options=SYNCHRONOUS) as write_api:
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/influxdb_client.py", line 336, in write_api
    return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 246, in __init__
    self._write_service = WriteService(influxdb_client.api_client)
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/write_service.py", line 32, in __init__
    raise ValueError("Invalid value for `api_client`, must be defined.")
ValueError: Invalid value for `api_client`, must be defined.
Exception ignored in: <function WriteApi.__del__ at 0xffffbbc58b80>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 404, in __del__
    if self._subject:
AttributeError: 'WriteApi' object has no attribute '_subject'

Sorry, I’m not a professional. I have only tried to adjust this example for InfluxDBv1 to v2.
https://struband.net/sensordaten-ab-usb-wde1-mit-influxdb-grafana-loggen-und-visualisieren/

This script works fine if the destination is a v1 DB. The interesting section is between line 50 to 57.

It looks like a messed installation of InfluxDB client. Try to reinstall client. If you are use pip than try following commands:

pip uninstall influxdb-client
pip install influxdb-client

Ok, I think I have to rearrange the python script. It works with your (simple) example, but the error occured, when the whole script is executed with some loops, try / except statements.

I think this thread can be closed. Many thanks for your help

It took some time, but I have now a working condition.
I also moved my python to docker.

Here is the whole python script to handle WDE-1 USB temperature / humidity sensors:


#!/usr/bin/env python

# Python Modules
import serial
import json
import sys
import time
import influxdb_client
from time import gmtime, strftime
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# You can generate an API token from the "API Tokens Tab" in the UI
token = "<your-token>"
url = "http://influxdbV2:8086"
org = "m<-org"
bucket = "weather"
measurement = "Messungen"
location = "my-location"

# Configuration
serialPort = '/dev/ttyUSB0'

# Open serial port
try:
    ser = serial.Serial(serialPort, baudrate=9600, timeout=None)
except:
    sys.exit(1)
# Main loop
while True:
    # Wait for data and try error recovery on disconnect
    try:
        now = datetime.now()
        timestamp_aq = datetime.timestamp(now)
        iso = datetime.utcnow()
        serData = ser.readline()
        dataset = str(serData).split(';')
        # Parse meter readings into dictionary (abbr. to rd for typing laziness of yours truely)
        rd = {}
        n = 1
        for n in range(1, 9):
            # Conversion from german decimal mark , to international .
            try:
                rd['temp'+str(n)] = float(dataset[2+n].replace(',', '.'))
            except ValueError:
                pass
            try:
                rd['hum'+str(n)] = float(dataset[10+n].replace(',', '.'))
            except ValueError:
                pass
        # Kombisensor is mapped to temp9/hum9
        try:
            rd['temp9'] = float(dataset[19].replace(',', '.'))
        except ValueError:
            pass
        try:
            rd['hum9'] = float(dataset[20].replace(',', '.'))
        except ValueError:
            pass
        try:
            rd['windspeed'] = float(dataset[21].replace(',', '.'))
        except ValueError:
            pass
        try:
            rd['rainfall'] = float(dataset[22].replace(',', '.'))
        except ValueError:
            pass
        # Write Dictionary to InfluxDB
        try:
            with InfluxDBClient(url=url, token=token, org=org) as client:
                write_api = client.write_api(write_options=SYNCHRONOUS)
                loaded = [
                    {
                        "measurement": measurement,
                        "tags": {
                            "Ort": location,
                            "domain": "sensor"
                        },
                        "time": iso,
                        "fields": rd
                    }
                ]
                print(loaded)
                write_api.write(bucket=bucket, record=loaded)
            time.sleep(120)
        except KeyboardInterrupt:
            write_api.__del__()
            client.__del__()
            ser.close()
            ser = serial.Serial(serialPort, baudrate=9600, timeout=None)
            sys.exit()
            pass
    except serial.SerialException as e:
        try:
            ser.close()
            ser = serial.Serial(serialPort, baudrate=9600, timeout=None)
            sys.exit()
        except:
            pass