Hi,
I’m using Python to write data to an Influx db and have seen mention of being able to read response codes to check how successful the write is…
The online docs don’t have it:
https://influxdb-client.readthedocs.io/en/latest/usage.html#synchronous-client
Can anyone add an example I might be able to use?
Does this help?
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
try:
client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86")
except InfluxDBError as e:
if e.response.status == 401:
raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e
raise
@Anaisdg that was quick! Thanks!
I’ll give it a go, but I think from memory it might cause my scripts to become slower.
My script uses a Twisted Internet reactor to run every 1s to read sensor data and opening/closing a connection to the database each time the method is called slows things down.
This is how I have written it:
#!/usr/bin/env python3
###Imports go here. Cut them out for brevity.
client = InfluxDBClient(url=URL_HERE, token=TOKEN_HERE, org=ORG_HERE, verify_ssl=False)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
def Trend():
trend_query = ' from(bucket:"' + influx_client_bucket + '")\
|> range(start: -10s)\
|> filter(fn: (r) => r["_measurement"] == "Thermocouple")\
|> filter(fn: (r) => r["_field"] == "value")\
|> drop(columns: ["_start", "_stop", "_field", "_measurement"])\
|> group(columns: ["Sensor"])\
|> tail(n: ' + str(trend_sample_count) + ')'
try:
trend_query_result = query_api.query(org=influx_client_org, query=trend_query)
except:
#Exception handling goes here
else:
try:
write_api.write(influx_client_bucket, influx_client_org, data_list, write_precision='s')
except:
#Exception handling here
finally:
#Tidy up the variables here.
if __name__ == "__main__":
l = task.LoopingCall(Trend)
l.start(1.0s)
reactor.run()
I appreciate the connections are never closed, but it allows the code to run much more quickly.
Should I be using a different write method? And do they all have response codes? I seem to remember seeing that only the write_API has them.
Thanks