Query influxDB records in batches

Hey guys. I have a question about the working of kapacitor and its connection to influxDB

  1. I have around 20,000 records in influxDB in a table called new_data ranging from 2019-12-03 to 2019-12-11
  2. I created a kapacitor batch task that would yield me a day’s worth of data every hour
  3. I can see that the batch task is created and is executing
  4. I tried to ingest this “batched” data into a python script by querying the new_data table

However, I still get all 20,000 records when in reality I should be ingesting only about 1,000 records into the python script.
It would be great if someone can point out the mistake in my approach. I have tried to search for answers but I am really at a dead end. Any help is appreciated! Thanks in advance!

Hello @adityal2810,
I’m not sure, so I’m going to ask around. However, I would imagine that you need to save the batched data to a new database with a RP of 1 day if you’re hoping to only store 1 days worth of data. Can you please share your python script and also your TICK script?

Hey @Anaisdg Thank you so much for the help.

This is the tick script:

dbrp “dummydb”.“autogen”

var chunk = batch
                |query('SELECT * FROM "dummydb"."autogen".new_data')
                .period(1d)
                .every(5s)
                |influxDBOut()
                .database('dummydb')
                .retentionPolicy('daily')
                .measurement('out_data')

measurement ‘out_data’ is not created when i run that script. So i queried new data which returns everything.

This is the python script:

import influxdb
import pandas as pd
client = influxdb.InfluxDBClient(‘localhost’, 8086, database = ‘dummydb’)
q = “select * from new_data”
data = pd.DataFrame(client.query(q).get_points())

I asked this in the slack channel and I believe kapacitor UDF’s are the only way to create batches of data and ingest those batches into python.

It would be great if you can point out how to save batched data into a new measurement. That might solve most of my problems.

Hey @Anaisdg

dbrp “dummydb”.“autogen”

var chunk = batch
|query(‘SELECT * FROM “dummydb”.“autogen”.new_data’)
.period(52w)
.every(15m)
|influxDBOut()
.create()
.database(‘dummydb’)
.retentionPolicy(‘autogen’)
.measurement(‘batch_data1’)

I am not able to create ‘batch_data1’ measurement in influxdb. Isn’t this the right approach? I included a groupBy clause and I was able to create a new measurement but it is not working the second time which is weird. Please advise!