I’m still struggling having a performant bulk-insert from c# to influx (docker).
I have a table/measurement with 71 cols/fields and just for testing 500 rows, but I need thousands of rows later with a decent performance.
I’ve tried different approaches, hope you may improve them for me or maybe confirm, that this is all performance I can get from influx for now.
nearly the same, but gathering a list_of_records first (List) and working with “WriteRecords” after the for-loops…(for-loops are taking 400ms, so that’s ok performancewise)
The last command takes 29,7seconds to transfer the data (other DB’s take ~8s for the same amount of data). And somehow in approach #1 I got data loss, some columns are not findable, in approach #2 the measurement is not there at all (maybe I’m doing sth. wrong?). Docker gives me no error message.
Your code works, but unfortunately the performance is quite bad. The whole procedure takes about 30seconds (sqlite & sql_ce take about 10s for everything). I’ve prepared a .cs file based on your good instructions to let you fully reproduce my results. I have two more questions :
Why does the bucket_creation command not work? I commented it out, it gives me the error " failed to unmarshal json: id must have a length of 16 bytes" which indicates, that I have to commit the bucketId? Isn’t that generated after creating a bucket…?
Second question : When I query data I get 70 tables with 4549 lines of results (actual rownr is 1296000), which seems to indicate that either the query limits the results or there is dataloss? Or has this sth. to do with an inherent feature of influx, meaning that it is giving all values of a certain timespan in a merged mean-value back?
Here’s my code for reproduction (create bucket manually first and fill in your token):
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
namespace Examples
{
public static class InfluxDBExample
{
public static async Task Main(string[] args)
{
var bucket = "test_bucket";
var org = "vstest";
var measurement = "aseries";
var url = "http://localhost:8086";
char[] token = "thetoken".ToCharArray();
var nr_of_rows = 129600;
Console.Write("\tinflux datapreparation...");
//data_preparation
var headers = new string[71];
var col_upper = -1;
for (int i = 0; i < 71; i++)
{
if (i % 26 == 0)
{
col_upper++;
}
headers[i] = "Col_" + (char)(65 + col_upper) + ((char)(i % 26 + 65));
}
double[,] double_vals = new double[headers.Length, nr_of_rows];
var timestamp = 0;
Random rnd = new Random();
for (int min_idx = 0; min_idx < nr_of_rows; min_idx++)
{
timestamp += 60;
for (int col_idx = 0; col_idx < headers.Length - 1; col_idx++)
{
if (col_idx == 0) double_vals[col_idx, min_idx] = timestamp;
double_vals[col_idx, min_idx] = rnd.NextDouble() * 100;
}
}
var stopWatch = new Stopwatch();
stopWatch.Start();
Console.Write("\n\tinflux client setup...");
using var client = InfluxDBClientFactory.Create(url, token);
//await CreateBucket(bucket, org) - not working ?!;
Console.Write("\r\tinflux client setup..."+stopWatch.Elapsed);
Console.Write("\n\tcommandlines setup...");
var list_of_lines = new List<string>();
for (var min_idx = 0; min_idx < nr_of_rows; min_idx++)
{
var datarecord = measurement+" ";
for (var col_idx = 0; col_idx < headers.Length -1; col_idx++)
{
datarecord += headers[col_idx] + "=" + double_vals[col_idx, min_idx].ToString("0.00", System.Globalization.CultureInfo.InvariantCulture) + ",";
}
datarecord = datarecord.Remove(datarecord.Length - 1, 1);
//datarecord += " " + min_idx;
list_of_lines.Add(datarecord);
}
Console.Write("\r\tinflux client setup...(last step in " + stopWatch.Elapsed + ")");
const int batchSize = 10000;
Console.Write("\n\tinflux writing async batches(" + batchSize + ")" );
// configure batching
var batches = list_of_lines
.Select((s, i) => list_of_lines.Skip(i * batchSize).Take(batchSize).ToList())
.Where(it => it.Any())
.ToList();
var batchnr = 0;
foreach (var batch in batches)
{
batchnr++;
Console.Write("\r\tinflux writing async batches("+batchSize+") "+batchnr+"/"+batches.Count+"............. (" + stopWatch.Elapsed + ")");
await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, batch);
}
//Console.Write("\n\tinflux writing async ... (last step in " + stopWatch.Elapsed + ")");
//await client.GetWriteApiAsync().WriteRecordsAsync(bucket, org, WritePrecision.Ns, list_of_lines);
//
// Query data
//
Console.Write("\n\tinflux reading async ... (last step in "+stopWatch.Elapsed+")" );
var flux_from_manual = "from(bucket:\""+bucket+"\") |> range(start: 0)";
var ownflux = "from(bucket:\""+bucket+ "\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"" + measurement+"\")";
var fluxTables = await client.GetQueryApi().QueryAsync(ownflux, org);
fluxTables.ForEach(fluxTable =>
{
var fluxRecords = fluxTable.Records;
fluxRecords.ForEach(fluxRecord =>
{
Console.WriteLine($"{fluxRecord.GetTime()}: {fluxRecord.GetValue()}");
});
});
Console.WriteLine("TABLES: "+fluxTables.Count);
client.Dispose();
Console.Write("\n\tfinished..."+stopWatch.Elapsed);
async Task CreateBucket(string bucket_s, string org_s)
{
//
// Create bucket with data retention set to 3,600 seconds
//
var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600);
var bucket = await client.GetBucketsApi().CreateBucketAsync(bucket_s, retention, org_s);
Console.Write("\n\tcreated new bucket with id:" + bucket.Id);
}
}
}
}
allright, thanks - that works much better with org_id!
I added timestamp to my data and the series lasts over 90 days
I can see the data in the browser ui but the c# console query is in an endless (?) loop grabbing more and more data (starte from 1GB ram up to 4GB after 5minutes, then I cancelled. The data can’t be that huge…). My flux is still unchanged :
The QueryAsync(String, String) is not supposed to use with this amount of data (9072000 rows). You can do following tweaks to achieve better performance:
I have 00:00:08.9128856 only for ingesting data. From var batchnr = 0; to Console.Write("\n\tinflux reading async ... (last step in "+stopWatch.Elapsed+")" );.