Hi everyone!!!
I’m fairly new to InfluxDB. Now, I’m using InfluxDB for the second time.
In the first project I build a small API that gets data from a Raspberry Pi and saves it into a InfluxDB bucket.
During the project I noticed that the procedure is pretty mechanical, so I came up with the idea of building a general purpose data API. By that I mean, creating an API that will write data no matter the sensor and will read data no matter the client.
So, as I said I’m new to InfluxDB and I will like to get the opinion of more experienced users.
To build my API, I created 3 modules.
- Main.py where I will run FastAPI
- Database.py where I connect to the database and I set the writing and reading actions.
- Model.py where I create the pydantic model for the API.
Main.py
from fastapi import FastAPI
import uvicorn
import yaml
from yaml.loader import SafeLoader
from Model.Model import WritingData, ReadingData
from Database.Database import InfluxDataBase
with open("config.yaml", "r") as ymlfile:
cfg = yaml.load(ymlfile,Loader=SafeLoader)
server_URL=cfg["InfluxDB"]["server_URL"]
token=cfg["InfluxDB"]["token"]
org=cfg["InfluxDB"]["org"]
Influx = InfluxDataBase(server_URL,token,org)
app = FastAPI()
@app.post('/write/')
async def call_writing_influx(data: WritingData):
Influx.write_data(data)
@app.post('/read/')
async def call_reading_influx(data: ReadingData):
return Influx.read_data(data)
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=5000, reload=True)
Database
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
class InfluxDataBase:
def __init__(self,server_URL,token,org) -> None:
self.client=InfluxDBClient(server_URL, token=token, org=org)
self.write_api=self.client.write_api(write_options=SYNCHRONOUS)
self.query_api=self.client.query_api()
self.server_URL=server_URL
self.token=token
self.org=org
def write_data(self,data) -> None:
executable_code='Point(data.measurement)'
n_fields=len(data.field)
n_tag=len(data.tag)
for i in range(n_tag):
executable_code=executable_code+'.tag(list(data.tag.keys())[{}],list(data.tag.values())[{}])'.format(i,i)
for i in range(n_fields):
executable_code=executable_code+'.field(list(data.field.keys())[{}],list(data.field.values())[{}])'.format(i,i)
if data.timestamp is not None:
executable_code=executable_code+'.time(data.timestamp)'
Data=eval(executable_code)
self.write_api.write(bucket=data.bucket_name, record=Data)
def read_data(self,data):
query = f'''
from(bucket: "{data.bucket_name}")'''+ '''
|> range(start: -{}h, stop: now())'''.format(data.time_interval)+f'''
|> filter(fn:(r) => r["_measurement"] == "{data.measureament_name}")'''
for i in range(len(data.tag)):
if i==0:
query=query+f'''|> filter(fn:(r) => r["{list(data.tag.keys())[i]}"] == "{list(data.tag.values())[i]}" '''
elif i<len(data.tag):
query=query+f''' or r["{list(data.tag.keys())[i]}"] == "{list(data.tag.values())[i]}"'''
query=query+')'
if data.field[0]=='All':
pass
else:
for i in range(len(data.field)):
if i==0:
query=query+f'''|> filter(fn:(r) => r["_field"] == "{data.field[i]}" '''
elif i<len(data.field):
query=query+f''' or r["_field"] == "{data.field[i]}"'''
query=query+')'
result = self.query_api.query(org=self.org, query=query)
results={}
for table in result:
for record in table.records:
results[record.get_field()]=record.get_value()
return results
Model.py
from pydantic import BaseModel
from typing import List,Dict,Optional
from datetime import datetime
class WritingData(BaseModel):
bucket_name: str
measurement: str
tag: Dict[str,str]
field: Dict[str,float]
timestamp: Optional[datetime] = None
class ReadingData(BaseModel):
bucket_name: str
time_interval: int
measureament_name: str
tag: Dict[str,str]
field: List[str]
My idea is to provide an easy interface to write and query data that requieres minimal configuration effort.
I would like to receive criticism, suggestions and alternatives from more advanced users.
I was thinking about using Telegraf as an alternative, but I’m pretty unfamiliar with the tool and I don’t know it will do what I have in mind.
Edit: I changed the read_data() function in database.py because I realized It wasn’t properly implemente