How to bulk insert (from c#)?

Hey there,

I’m still struggling having a performant bulk-insert from c# to influx (docker).
I have a table/measurement with 71 cols/fields and just for testing 500 rows, but I need thousands of rows later with a decent performance.

I’ve tried different approaches, hope you may improve them for me or maybe confirm, that this is all performance I can get from influx for now.

Approach #1

for (int min_idx = 0; min_idx < values.Length; min_idx++)
                            {
string datarecord = "aseries ";

                                for (int col_idx = 0; col_idx < headers.Length - 1; col_idx++)
                                {
                                    datarecord += headers[col_idx]+"="+double_vals[col_idx, min_idx].ToString() + ",";
                                }
datarecord = datarecord.Remove(datarecord.Length - 1, 1);
                                datarecord += " " + UnixTime(min_idx);

sing (var writeApi = influxDBClient.GetWriteApi())
                                {
                                    writeApi.WriteRecord(bucketid, org, WritePrecision.Ns, datarecord);
                                }
}
}

Approach #2

nearly the same, but gathering a list_of_records first (List) and working with “WriteRecords” after the for-loops…(for-loops are taking 400ms, so that’s ok performancewise)

using (var writeApi = influxDBClient.GetWriteApi())
                            {
                                writeApi.WriteRecords(bucket, org, WritePrecision.S, list_of_lines);
                            }

The last command takes 29,7seconds to transfer the data (other DB’s take ~8s for the same amount of data). And somehow in approach #1 I got data loss, some columns are not findable, in approach #2 the measurement is not there at all (maybe I’m doing sth. wrong?). Docker gives me no error message.

Hope someone can help me here a bit…

Hi @ben_bln,

thanks for using our client.

The WriteApi is suppose to run as a long live singleton, because it uses background thread for batching and ingesting data into InfluxDB.

For your purpose - bulk-insert - is better to use WriteApiAsync.

You can use something like:

using System.Collections.Generic;
using System.Threading.Tasks;
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;

namespace Examples
{
    public static class InfluxDBExample
    {
        public static async Task Main(string[] args)
        {
            const string url = "http://localhost:9999";
            const string token = "my-token";
            const string org = "my-org";
            const string bucket = "my-bucket";
            using var client = InfluxDBClientFactory.Create(url, token.ToCharArray());

            var list_of_lines = new List<string>();
            for (var min_idx = 0; min_idx < 500; min_idx++)
            {
                var datarecord = "aseries ";

                for (var col_idx = 0; col_idx < 100 - 1; col_idx++)
                {
                    datarecord += $"header{col_idx}={col_idx + min_idx},";
                }

                datarecord = datarecord.Remove(datarecord.Length - 1, 1);
                datarecord += " " + min_idx;

                list_of_lines.Add(datarecord);
            }


            await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, list_of_lines);
        }
    }
}

You can also split your records into batches by:

// configure batching
const int batchSize = 100;
var batches = list_of_lines
    .Select((s, i) => list_of_lines.Skip(i * batchSize).Take(batchSize).ToList())
    .Where(it => it.Any())
    .ToList();

foreach (var batch in batches)
{
    Console.WriteLine($"Ingesting batch... {batch.Count}");

    await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, batch);
}

Regards

Well thanks a lot!

Your code works, but unfortunately the performance is quite bad. The whole procedure takes about 30seconds (sqlite & sql_ce take about 10s for everything). I’ve prepared a .cs file based on your good instructions to let you fully reproduce my results. I have two more questions :

  • Why does the bucket_creation command not work? I commented it out, it gives me the error " failed to unmarshal json: id must have a length of 16 bytes" which indicates, that I have to commit the bucketId? Isn’t that generated after creating a bucket…?
  • Second question : When I query data I get 70 tables with 4549 lines of results (actual rownr is 1296000), which seems to indicate that either the query limits the results or there is dataloss? Or has this sth. to do with an inherent feature of influx, meaning that it is giving all values of a certain timespan in a merged mean-value back?

Here’s my code for reproduction (create bucket manually first and fill in your token):

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;

namespace Examples
{
    public static class InfluxDBExample
    {

        public static async Task Main(string[] args)
        {
            var bucket = "test_bucket";
            var org = "vstest";
            var measurement = "aseries";
            var url = "http://localhost:8086";
            char[] token = "thetoken".ToCharArray();
            var nr_of_rows = 129600;

            Console.Write("\tinflux datapreparation...");
            //data_preparation

            var headers = new string[71];

            var col_upper = -1;
            for (int i = 0; i < 71; i++)
            {
                if (i % 26 == 0)
                {
                    col_upper++;
                }
                headers[i] = "Col_" + (char)(65 + col_upper) + ((char)(i % 26 + 65));
            }

            double[,] double_vals = new double[headers.Length, nr_of_rows];
            var timestamp = 0;
            Random rnd = new Random();
            for (int min_idx = 0; min_idx < nr_of_rows; min_idx++)
            {
                timestamp += 60;

                for (int col_idx = 0; col_idx < headers.Length - 1; col_idx++)
                {
                    if (col_idx == 0) double_vals[col_idx, min_idx] = timestamp;
                    double_vals[col_idx, min_idx] = rnd.NextDouble() * 100;
                }
            }

            var stopWatch = new Stopwatch();
            stopWatch.Start();
            Console.Write("\n\tinflux client setup...");
            using var client = InfluxDBClientFactory.Create(url, token);
            //await CreateBucket(bucket, org) - not working ?!;

            Console.Write("\r\tinflux client setup..."+stopWatch.Elapsed);
            Console.Write("\n\tcommandlines setup...");
            var list_of_lines = new List<string>();
            for (var min_idx = 0; min_idx < nr_of_rows; min_idx++)
            {
                var datarecord = measurement+" ";

                for (var col_idx = 0; col_idx < headers.Length -1; col_idx++)
                {
                    datarecord += headers[col_idx] + "=" + double_vals[col_idx, min_idx].ToString("0.00", System.Globalization.CultureInfo.InvariantCulture) + ",";
                }

                datarecord = datarecord.Remove(datarecord.Length - 1, 1);
                //datarecord += " " + min_idx;

                list_of_lines.Add(datarecord);
            }
            Console.Write("\r\tinflux client setup...(last step in " + stopWatch.Elapsed + ")");
            const int batchSize = 10000;
            Console.Write("\n\tinflux writing async batches(" + batchSize + ")" );

            // configure batching
            
            var batches = list_of_lines
                .Select((s, i) => list_of_lines.Skip(i * batchSize).Take(batchSize).ToList())
                .Where(it => it.Any())
                .ToList();

            var batchnr = 0;

            foreach (var batch in batches)
            {
                batchnr++;
                Console.Write("\r\tinflux writing async batches("+batchSize+") "+batchnr+"/"+batches.Count+"............. (" + stopWatch.Elapsed + ")");
                await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, batch);
            }

            //Console.Write("\n\tinflux writing async ... (last step in " + stopWatch.Elapsed + ")");
            //await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, list_of_lines);

            //
            // Query data
            //
            Console.Write("\n\tinflux reading async ... (last step in "+stopWatch.Elapsed+")" );
            var flux_from_manual = "from(bucket:\""+bucket+"\") |> range(start: 0)";
            var ownflux = "from(bucket:\""+bucket+ "\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"" + measurement+"\")";


            var fluxTables = await client.GetQueryApi().QueryAsync(ownflux, org);
            fluxTables.ForEach(fluxTable =>
            {
                var fluxRecords = fluxTable.Records;
                fluxRecords.ForEach(fluxRecord =>
                {
                    Console.WriteLine($"{fluxRecord.GetTime()}: {fluxRecord.GetValue()}");
                });
            });

            Console.WriteLine("TABLES: "+fluxTables.Count);
            client.Dispose();
            Console.Write("\n\tfinished..."+stopWatch.Elapsed);

            async Task CreateBucket(string bucket_s, string org_s)
            {

                //
                // Create bucket with data retention set to 3,600 seconds
                //
                var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600);
                var bucket = await client.GetBucketsApi().CreateBucketAsync(bucket_s, retention, org_s);
                Console.Write("\n\tcreated new bucket with id:" + bucket.Id);
            }
        }
    }
}

The org parameter for CreateBucketAsync has to be Organization.Id not name.

There is not query limit.

Try to add timestamp to your data. The data which arrive in same time are merged into one timeseries.

allright, thanks - that works much better with org_id!

I added timestamp to my data and the series lasts over 90 days
grafik

I can see the data in the browser ui but the c# console query is in an endless (?) loop grabbing more and more data (starte from 1GB ram up to 4GB after 5minutes, then I cancelled. The data can’t be that huge…). My flux is still unchanged :

var ownflux = "from(bucket:\""+bucket+ "\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"" + measurement+"\")";


            var fluxTables = await client.GetQueryApi().QueryAsync(ownflux, org);
            fluxTables.ForEach(fluxTable =>
            {
                var fluxRecords = fluxTable.Records;
                fluxRecords.ForEach(fluxRecord =>
                {
                    Console.WriteLine($"{fluxRecord.GetTime()}: {fluxRecord.GetValue()}");
                });
            });

Hope you can help me with that + performance (have you tested my datatransmission at your pc?)

This query contains a lot of redundancies which has to be parsed by QueryApi:

The QueryAsync(String, String) is not supposed to use with this amount of data (9072000 rows). You can do following tweaks to achieve better performance:

  1. drop() useless data. Use something like: |> drop(columns: ["_start", "_stop", "_measurement"])
  2. use streaming version of QueryApi - QueryAsync(String, String, Action<ICancellable, FluxRecord>)

Hope you can help me with that + performance (have you tested my datatransmission at your pc?)

Yes, on my laptop the ingesting takes 8seconds.

1 Like

@bednar I have the following results atm :

  •     influx client setup completed in 00:00:01.2798261
    
  •     influx commandlines setup completed in 00:00:09.7095961
    
  •     influx writing async batches completed in 00:00:14.2104267
    
  •     influx reading async (70 tables ) completed in 00:00:00.3324641
    
  •     finished...00:00:25.5335830
    

So you have 8s for phase 2 & 3 from C# ?! How?! (with other databases I achieve ~9Seconds for all phases + deleting)

My specs :
Win10 Pro
AMD Ryzen 5 1600X, 3,60Ghz
Samsung SSD EVO 950
16GB

Running influx via docker

I have 00:00:08.9128856 only for ingesting data. From var batchnr = 0; to Console.Write("\n\tinflux reading async ... (last step in "+stopWatch.Elapsed+")" );.

My Spec:

image

1 Like

Ok it’s a better performance, but still not what I need. You are still around 15-20s overall I guess…I’m aiming for <8s for everything