I had added data to the database (influxDB OSS). It has ~200k rows. I verified by using cli (select * from m_prices where m_prices is the measurement name). Using python client I am trying to get this data in pandas data frames. However the issue is it is only giving me few rows (like 70k or 90k) when I print the len of df. The number of rows fetched varies every time I run the code.
Steps
- upload the data using an end-point provided by my service. The data gets into the database using the following code.
I have created a singleton class for
class DB:
self.write_api created using InfluxDBClientAsync client and is accessible to any method that would like to write to database.
Write method:
async def write_df_data(self, req:DBWriteRequest):
res = await self.write_api.write(
bucket=req.bucket,
record=req.records,
data_frame_measurement_name=req.measurement,
data_frame_tag_columns=req.tags,
write_precision=WritePrecision.NS
)
return res
- Query the uploaded data
client = InfluxDBClient(url=settings.influx_url, token=settings.influx_token.get_secret_value(), org=settings.influx_org)
query_api = client.query_api()
query = f'''
from(bucket: "{query_req.bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "{query_req.measurement}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "result", "_stop", "table", "_field"])
'''
tables = query_api.query_data_frame(query=query)
logger.debug(f"number of records fetched {len(tables)=}")
Please note
- I tried using the Async client to perform the same query and I ran into same issue.
- If I query after sometime the number keeps increasing. It started from 40k and later went to 70k, 100k and finally after a day or two I can see it is fetching all the rows.