Upload the CSV with millions of rows into influxdb


#1

Hi guys,

As I am already aware that this topic has been discussed many times however I need to how to best write the data in influxDB when data is having around 5 million rows keeping performance in mind

We have the CSV file which would contain around 5 million rows. We are using python to upload the data in influx but seems like it has got performance issues. It takes around 3 mins for 100K rows to upload having 3 tags and 3 values.

The steps which I am taking atm:

  1. Using Pandas to read the csv file.
    read_csv(file_path) function to load and create the DF

  2. Using the for loop to create the JSON body having 500k rows
    json_body = []
    for row_index, row in data.iterrows():
    json_body.append({‘tags’: {‘server’: rt_server, ‘event_status’:event_status, ‘delivery_status’ :
    delivery_status},‘fields’: {‘delivery_duration’: row[1], ‘processing_time’: row[3], ‘queue_time’: row[4]},‘time’:
    row[0],‘measurement’: table_name})
    These tags/values contains dynamic values assigned during the for loop. It takes around 10 seconds to run this loop and create entire JSON body.

  3. Finally, I am using write_points method to ingest data in influxDB.
    ingestion_status = client.write_points(json_body,batch_size=100000)
    This step around takes 3 mins for 100K rows, and 15 mins for 500K rows.

Can you guys please assist me how to improve the performance and how to take it forward? This is been frustration for me as I have read all the documents but couldn’t find the right solution still.

Regards,
Anshul


#2

You shouldn’t write the data as JSON, you need to write it as Influx Line Protocol.

It will be faster for influx to handle.


#3

As a side comment, we have a 4 node enterprise cluster for stage. It has 4 cores on each data node. It can handle over 200k points per second without breaking hitting 50% cpu utilization. You will have different results for OSS but InfluxDB can handle a ton of data if it’s formatted and batched properly


#4

Hi Esity, thanks, I need to use python for ingestion of the data into influx. I have read influx documentation to use either write_points or series helper class to push the data, in that documentation the given example is to convert that into dictionary having value and tags and then inject that measurement, see the below link.
https://influxdb-python.readthedocs.io/en/latest/api-documentation.html

The documentation you have shared i can’t understand how I would convert csv into line protocol and inject them using python program.
However, I have used those line protocols to inject the data manually by using command line interface for some limited rows only which worked well for me.

Would you like to share simple steps/ sample program here on how I could achieve it based on your earlier comments?


#5

If you wrote the original code you posted, then you should be able to convert it so that instead of creating a JSON object, you create an InfluxDB Line Protocol data binary to post.

I am not going to write your python code for you,
Try in baby steps. Try to write a python script that coverts a single CSV file(with only 1 line) and prints the influxdb line protocol and then expand from there


#6

Yes that looks good. I just wanted to make sure that if that’s the right direction to proceed.
It helped a lot, I will let you know how it goes and will come back to you.
Thanks a lot so far :grinning:


#7

Hi Esity,
I have been working on this and been able to get the below. My influx system is down from a while and hence cannot measure the performance yet but just wanted to check with you if the below approach is right to creatye line protocol :slight_smile:

for row in reader:
tags, fields = [], []

‘Update the tags’
for t in tag_columns:
if t in row:
tags.append("{tag}={value}".format(tag=t, value=format_tags(t, row[t])))
‘Update the values’
for f in field_columns:
if f in row:
fields.append("{field}={value}".format(field=f, value=format_fields(row[f])))
‘Update the time’
datetime_naive = datetime.datetime.strptime(str(row[time_column]), timeformat)
datetime_local = timezone(datatimezone).localize(datetime_naive)
timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds

‘create the line protocol’
data_points.append("{measurement},{tags} {fields} {time}".format(measurement=measurement, tags=’,’.join(tags),
fields=’,’.join(fields), time=timestamp))


#8

Hi Esity,

I had tested both the approach, ingestion using the JSON and line. I could just see the marginal ingestion time difference between the two techniques.

Am I doing something wring here or is it any bandwidth limitation on influxdb which might be affecting the ingestion speed?

Regards,
Anshul


#9

Hi Esity,

Could you please tell if there is something I could do differently to improve the speed of ingestion of the data. As I have mentioned above based on your feedback to store the data using line protocol but seems like getting the marginal difference when comparing against JSON.

reader = csv.DictReader(csvfile, delimiter=delimiter)
forstart = time.time()
for row in reader:
    start = time.time()
    tags, fields = [], []
    
    'Update the tags'
    for t in tag_columns:
        if t in row:
            tags.append("{tag}={value}".format(tag=t, value=format_tags(t, row[t])))
    'Update the values'
    for f in field_columns:
        if f in row:
            fields.append("{field}={value}".format(field=f, value=format_fields(row[f])))
    'Update the time'
    datetime_naive = datetime.datetime.strptime(str(row[time_column]), timeformat)
    datetime_local = timezone(datatimezone).localize(datetime_naive)
    timestamp = unix_time_millis(datetime_local) * 1000000  # in nanoseconds

    'create the line protocol'
    data_points.append("{measurement},{tags} {fields} {time}".format(measurement=measurement, tags=','.join(tags),
                                                                     fields=','.join(fields), time=timestamp))
    'Count the number of rows has been read'
    count = count + 1
    if count > 10000:
        print("Exiting the Program")
        break

    # data_points.append(measurement+','+','.join(tags) +' '+ ','.join(fields))
    if len(data_points) % batch_size == 0:
        end = time.time()
        print('Total time to convert the batch of frames to line protocol = ', end - start)
        line = '\n'.join(data_points)
        # print(line)
        print('Read %d lines ' % count)
        print('Inserting %d data_points...' % (len(data_points)))
        start = time.time()
        ingestion_status = client.write_points(line,protocol=protocol)
        end = time.time()
        print('Total time to ingest the data successfully = ', end - start)

Regards,
Anshul