Building a Data API with InfluxDB and FastAPI

Hi everyone!!!
I’m fairly new to InfluxDB. Now, I’m using InfluxDB for the second time.
In the first project I build a small API that gets data from a Raspberry Pi and saves it into a InfluxDB bucket.
During the project I noticed that the procedure is pretty mechanical, so I came up with the idea of building a general purpose data API. By that I mean, creating an API that will write data no matter the sensor and will read data no matter the client.

So, as I said I’m new to InfluxDB and I will like to get the opinion of more experienced users.

To build my API, I created 3 modules.

  • Main.py where I will run FastAPI
  • Database.py where I connect to the database and I set the writing and reading actions.
  • Model.py where I create the pydantic model for the API.

Main.py

from fastapi import FastAPI
import uvicorn
import yaml
from yaml.loader import SafeLoader
from Model.Model import WritingData, ReadingData
from Database.Database import InfluxDataBase



with open("config.yaml", "r") as ymlfile:
    cfg = yaml.load(ymlfile,Loader=SafeLoader)
server_URL=cfg["InfluxDB"]["server_URL"]
token=cfg["InfluxDB"]["token"]
org=cfg["InfluxDB"]["org"]

Influx = InfluxDataBase(server_URL,token,org)
app = FastAPI()

@app.post('/write/')
async def call_writing_influx(data: WritingData):
    Influx.write_data(data)

@app.post('/read/')
async def call_reading_influx(data: ReadingData):
    return Influx.read_data(data)
    
if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=5000, reload=True)

Database

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json

class InfluxDataBase:
    
    def __init__(self,server_URL,token,org) -> None:
        self.client=InfluxDBClient(server_URL, token=token, org=org)
        self.write_api=self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api=self.client.query_api()
        self.server_URL=server_URL
        self.token=token
        self.org=org

    def write_data(self,data) -> None:
        executable_code='Point(data.measurement)'
        n_fields=len(data.field)
        n_tag=len(data.tag)
        for i in range(n_tag):
            executable_code=executable_code+'.tag(list(data.tag.keys())[{}],list(data.tag.values())[{}])'.format(i,i)
        for i in range(n_fields):
            executable_code=executable_code+'.field(list(data.field.keys())[{}],list(data.field.values())[{}])'.format(i,i)
        if data.timestamp is not None:
            executable_code=executable_code+'.time(data.timestamp)'

        Data=eval(executable_code)
        self.write_api.write(bucket=data.bucket_name, record=Data)
    
    
    def read_data(self,data):
        query = f'''
        from(bucket: "{data.bucket_name}")'''+ ''' 
        |> range(start: -{}h, stop: now())'''.format(data.time_interval)+f'''
        |> filter(fn:(r) => r["_measurement"] == "{data.measureament_name}")'''
        for i in range(len(data.tag)):
            if i==0:
                query=query+f'''|> filter(fn:(r) => r["{list(data.tag.keys())[i]}"] == "{list(data.tag.values())[i]}" '''
            elif i<len(data.tag):
                query=query+f''' or r["{list(data.tag.keys())[i]}"] == "{list(data.tag.values())[i]}"'''
        query=query+')'
        if data.field[0]=='All':
            pass
        else:
            for i in range(len(data.field)):
                if i==0:
                    query=query+f'''|> filter(fn:(r) => r["_field"] == "{data.field[i]}" '''
                elif i<len(data.field):
                    query=query+f''' or r["_field"] == "{data.field[i]}"'''
            query=query+')'
        result = self.query_api.query(org=self.org, query=query) 
        results={}
        for table in result:
            for record in table.records:
                results[record.get_field()]=record.get_value()
        return results

Model.py

from pydantic import BaseModel
from typing import List,Dict,Optional
from datetime import datetime

class WritingData(BaseModel):
    bucket_name: str
    measurement: str
    tag: Dict[str,str]
    field: Dict[str,float]
    timestamp: Optional[datetime] = None

class ReadingData(BaseModel):
    bucket_name: str
    time_interval: int
    measureament_name: str
    tag: Dict[str,str]
    field: List[str]

My idea is to provide an easy interface to write and query data that requieres minimal configuration effort.

I would like to receive criticism, suggestions and alternatives from more advanced users.

I was thinking about using Telegraf as an alternative, but I’m pretty unfamiliar with the tool and I don’t know it will do what I have in mind.

Edit: I changed the read_data() function in database.py because I realized It wasn’t properly implemente :slight_smile:

1 Like