Increasing InfluxDB insertion rate via Influx-Python lib


#1

I am running with an issue and not sure how to move forward with it.

I have integrated influxdb insertion with python library and have around 10million records that needs to be inserted in influxDb.
Here’s what I do:

  1. I perform some operation on the data.
  2. make it in the json format needed for insertion
  3. then write it to influxdb.

My problem is – Data operation takes 4 units of time and influxdb insertion takes 10 units of time. To handle this, I have used threads, but it is not giving significant results.
This entire process of inserting ~10 million records in database takes > 4hrs.
But, if I write the data to a file and then use import command, it just takes 15 mins to be imported.

Is there a way I can speedup my insertions without making an intermediate file and then importing?


#2

When you convert the data to JSON, it needs to be converted back to Line Protocol before it is sent. Instead of creating JSON from the data, create Line Protocol.

Look at the write_points() function and SeriesHelper class and ensure that you are writing points to InfluxDB in batches instead of one at a time. We recommend writing batches of around 5000-10000 points; you can specify the batch size by providing an argument to write_points().


#3

Can you please explain how batching works? I should just specify the attribute value and it will do batching by itself??


#4

I was looking at the write_points function recently. You give it a python list of dictionaries that define the points, so you don’t have to construct the lines yourself. The function will generate the lines and post them to the API in a single, giant POST. If you set the batch_size argument, it will internally POST the lines in groups no larger than batch_size.


#5

Thank you both… It was very helpful. I am now able to insert approx 15k records/sec.
Can I increase the insertion rate anymore?


#6

Hi Ammar/Fluffynukeit/noahcrowley,

Just like Ammar got stuck in uploading the data in influxDB and I am stuck here too. It takes around 150 seconds for 100K rows in InfluxDB.

I am first converting the csv to DF using Pandas. Then using the for loop to prepare the JSON body, and then using the write_points method to ingest the data having batch size = 10000.

Here is some sample code which I have.

  1. read CSV using pandas
    .read_csv function

  2. for loop to create to iterate over the rows and create the JSON body
    json_body = []
    for row_index, row in data.iterrows():
    json_body.append({‘tags’: {‘server’: servername, ‘event_status’:event_status, ‘delivery_status’ :
    delivery_status},‘fields’: {‘delivery_duration’: row[1], ‘processing_time’: row[3], ‘queue_time’: row[4]},‘time’:
    row[0],‘measurement’: table_name})

  3. ingestion_status = client.write_points(json_body,batch_size=100000)

Can you please guide on how this can be improved?


#7

Hi Ammar,

Just like you got stuck in uploading the data in influxDB and I am stuck here too. It takes around 150 seconds for 100K rows in InfluxDB.

I am first converting the csv to DF using Pandas. Then using the for loop to prepare the JSON body, and then using the write_points method to ingest the data having batch size = 10000.

Here is some sample code which I have.

  1. read CSV using pandas
    .read_csv function

  2. for loop to create to iterate over the rows and create the JSON body
    json_body = []
    for row_index, row in data.iterrows():
    json_body.append({‘tags’: {‘server’: servername, ‘event_status’:event_status, ‘delivery_status’ :
    delivery_status},‘fields’: {‘delivery_duration’: row[1], ‘processing_time’: row[3], ‘queue_time’: row[4]},‘time’:
    row[0],‘measurement’: table_name})

  3. ingestion_status = client.write_points(json_body,batch_size=100000)

Can you please guide on how this can be improved?