InfluxDB Client failure when writing large amounts of data

I’m currently writing a golang system that essentially simulates a system of entities over an entire day (which can be viewed on a website.) While the simulation itself is fast to calculate this data, the weakest link of the system is InfluxDB’s golang client, which is failing to cope with the large amount of data.

For an example, say there are 20 entities. They operate over an entire day in seconds, so 86400 steps of simulation. For each step of simulation, they write 8 points of data. This means that, roughly, there are 13,824,000 points to be written. The batch size is 5000; the Retry Buffer limit is 25,000; the Flush Interval is 5000ms. While I am writing this data, I am also simultaneously querying the database every second for information about these entities (in this case, 3 points of data per entity.)

The entire simulation takes 10 seconds to complete, at which point the system flushes all remaining data. It writes data to the database successfully for a while, but begins to slow over time, with high RAM and CPU usage (to be expected). After about 2 minutes, I get this error:

Post "http://localhost:9999/api/v2/query?org=MyOrg": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

Following which the golang system panics, and the signal is killed, with no more data being sent.

Is there a way to set up the system to handle such a large quantity of data to be written? Do I need to consider higher performance computing for this operation?

EDIT: It turns out that the sim is failing before it gets to the flush stage, not after. This means it’s something to do with writing rather than flushing.

@jos, if I understood correctly, you:

  • First generating 13,824,000 points to be written.
  • Then writing points in a loop
  • While writing, querying data.
  • After a while, you get the error about timeout and client process crashes.

What is the crash output?

What server do you have? (v1/v2, OSS/cloud)

Is the client on the same machine as a server?

The server can get overloaded, you can try to increase HTTP timeout to e.g. 1min:

client := influxdb2.NewClientWithOptions(server.URL, "a", 
     influxdb2.DefaultOptions().SetHTTPRequestTimeout(60))

You’ve basically got the gist of the procedure, although the system starts writing the second there are points available, meaning that some of the points are in the buffer while others are being written at the same time. Additionally, the writes stop once all points have been sent to Influx DB, and the queries being immediately upon connection to that website to view the simulation (which, currently, also triggers the simulation to begin.)

I should also say that when I said ‘8 points of data’, I meant that the point written to the InfluxDB database has 8 bits of information (variables) attached to it.

The crash output is as posted in the original message (with a copy directed at the IP address of the server), as well as a traceback to the function it was running in, ending with signal killed.

I am using InfluxDB 2.0, as OSS. Currently, the client is on the same machine as the server, but will be testing this soon with them separated.

I will also try the HTTP timeout option, although I feel that this is only a temporary solution given that more time steps could be used in the future.

Your fist post contains only the error: Post "http://localhost:9999/api/v2/query?org=MyOrg": context deadline exceeded (Client.Timeout exceeded while awaiting headers), I don’t see anything else, no attachment.
If this is the only console output of the client, as you describe it was ended with signal killed, it could mean the system killed it, probably due to the low memory situation.

Could, you, please, describe more about your data? What kind of fields you have and what tags and values of those tags?

AN IMPORTANT NOTE: I should mention that this program has actually been failing before I reach the flush stage, not after, so the title was inaccurate. I have changed the title to reflect this, but I won’t edit the preceeding posts as that would change the context of this discussion. Does this mean I need to do something to increase the batch size, since it is overfilled?

Original post:

A written point is structured as:

p := influxdb2.NewPointWithMeasurement("ObjectData").
		AddTag("name", object.Name). 
		AddField("var1", var1).
		AddField("var2", var2).
		...
		AddField("var8", var8). 
		SetTime(timestamp)   

Vars 1-6 are float64, with the last two Vars being int .

I should have mentioned earlier, but there’s also another type of point that gets written occasionally (but not every time). This is associated with another separate group of objects, which is considerably larger than the ones I am mentioning here. I’m not sure of the exact contribution to writes here, but it would be a bit larger than the satellites…in any case, it’s going to be contributing to load for sure.

Those points are similar to that above, except there are only three Fields: one name string, and two float64s.

The error looks like this:

 2020/07/22 09:50:00 Post "http://localhost:9999/api/v2/query?org=MyOrg": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
 2020/07/22 09:50:00 http: panic serving MyIPAddress:SomePortNumber: Post "http://localhost:9999/api/v2/query?org=MyOrg": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

    goroutine 19 [running]:
    net/http.(*conn).serve.func1(0xc000204280)
            /usr/lib/go-1.14/src/net/http/server.go:1772 +0x139
    panic(0xbcbb40, 0xc194ab39b0)
            /usr/lib/go-1.14/src/runtime/panic.go:975 +0x3e3
    log.Panicln(0xc049fa9420, 0x1, 0x1)
            /usr/lib/go-1.14/src/log/log.go:365 +0xac
    github.com/exampleOrg/sim.MyQueryFunction(0xe001c0, 0xc0002cb240, 0x0, 0xed6a97c97, 0x14d6f00, 0xc0001009b0, 0x3e8, 0xc194addf18)

… (traces through my personal functions)… then into:

    net/http.HandlerFunc.ServeHTTP(0xc00037a0a0, 0xe05280, 0xc0001935e0, 0xc0002e5000)
            /usr/lib/go-1.14/src/net/http/server.go:2012 +0x44
    net/http.(*ServeMux).ServeHTTP(0x14d6e60, 0xe05280, 0xc0001935e0, 0xc0002e5000)
            /usr/lib/go-1.14/src/net/http/server.go:2387 +0x1a5
    net/http.serverHandler.ServeHTTP(0xc000193500, 0xe05280, 0xc0001935e0, 0xc0002e5000)
            /usr/lib/go-1.14/src/net/http/server.go:2807 +0xa3
    net/http.(*conn).serve(0xc000204280, 0xe06a40, 0xc0002caf80)
            /usr/lib/go-1.14/src/net/http/server.go:1895 +0x86c
    created by net/http.(*Server).Serve
            /usr/lib/go-1.14/src/net/http/server.go:2933 +0x35c
    2020/07/22 09:50:47 [E]! Write error: Post "http://localhost:9999/api/v2/write?bucket=MyBucket&org=MyOrg&precision=ns": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
    write error: Post "http://localhost:9999/api/v2/write?bucket=MyOrg&org=MyBucket&precision=ns": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
    signal: killed

Increasing the timeout via SetHttpRequestTimeout did not solve the query error issue, but it did prevent write error from appearing. Increasing the Batch size to 10,000, RetryBufferLimit to 50,000 and Flush Interval to 1000 didn’t fix the query error.

Transferring the program to a much more powerful setup (without using the http request timeout option) allowed the operation to complete successfully. However, it took 2 minutes after the simulation was complete to write all the data.

Since the failure is actually in the writing stage, rather than the flush stage, what influxdb options should I use for this kind of data that I am writing?

I don’t see any traces that panic is caused by the code of the library. It looks that there is some weak part in the sim.MyQueryFunction.

Not sure how do you write points. In the case you are not already doing that so, you can benefit from the asynchronous WriteAPI by using WritePoint in different goroutines to speed up writing.

I’m currently using the async WritePoint function to write points, and have previously tried to use WritePoint in Goroutines, but I ran into issues where the client is spammed with too many connections at once; however, I take it that you were suggesting that writeAPI.WritePoint(p) should be run as go writeAPI.WritePoint(p) - is that right?

I tried it in this manner (on the slower PC), and ran into the same errors again, with more WriteErrors this time. Enabling the SetHttpRequestTimeout(60) line again removed the WriteErrors, leaving the Query related errors behind. Running this new code on the more powerful PC did increase write speed, but not massively (by about 10-20 seconds)

What sort of weakness do you expect could be in sim.MyQueryFunction that would cause these problems?

You have to control the number of go routines, when using multiple go routines for writing. Good practice would be:

  • have a channel of Point
  • start a defined number of write routines, e.g. half of the MAXPROCS
  • range over the point channel in the write routine

Regarding crashing, I suspect some insufficient error checking or nil checking. Can you share the source code?

Again, just to clarify - did you mean that I should run writeAPI.WritePoint(p) as go writeAPI.WritePoint(p) instead, except using channels to manage number of goroutines?

I did use channels with goroutines previously previously (albeit with the whole influxdb2.NewPointWithMeasurement process) which resulted in strange outliers in the data, as I think the live state of the variables were being edited at the same time. This was the incorrect use case however…

I can share an version of that function that looks like this:

func MyQueryFunction(queryAPI api.QueryApi, timeVal time.Time, objects *entities.Objects, timeRes int) map[string]ObjectInformation {
	// Turn timeVal into the same format seen on InfluxDB
	const timeCheckConstant = "2006-01-02T15:04:05-07:00"
	timeAfter := timeVal.Add(time.Duration(timeRes) * time.Millisecond) // Only look at small section of time.

	timeFrmt := timeVal.Format(timeCheckConstant)
	timeAfterFrmt := timeAfter.Format(timeCheckConstant)

	querytimeReturnContainer := make(map[string]ObjectInformation)
	objNameList := sanitise(objects.GetObjIDs())

	nameString := `|> filter(fn: (r) => r["name"] == "` + strings.Join(objNameList, `" or r["name"] == "`) + `")`
	queryInput := fmt.Sprintf(`from(bucket: "MyBucket")
	    |> range(start: %s, stop: %s)
	    |> filter(fn: (r) => r["_measurement"] == "MyData")
	    |> filter(fn: (r) => r["_field"] == "Var1" or r["_field"] == "Var2" or r["_field"] == "Var3")
		|> filter(fn: (r) => r["_time"] == %s)
		%s
		|> pivot(
	      rowKey:["_time"],
	      columnKey: ["_field"],
	      valueColumn: "_value"
	    )`, timeFrmt, timeAfterFrmt, timeFrmt, nameString) // Query for all objects present
	result, err := queryAPI.Query(context.Background(), queryInput)

	var objName string
	var objQueryStruct queryObjInfo
	if err == nil {
		for result.Next() {
			RecordColm := result.Record()
			RecordVal := RecordColm.Values()

			objName = RecordVal["name"].(string)
			objQueryStruct.Var1Val = RecordVal["Var1"].(float64)
			objQueryStruct.Var2Val = RecordVal["Var2"].(float64)
			objQueryStruct.Var3Val = RecordVal["Var3"].(float64)

			querytimeReturnContainer[objName] = objQueryStruct
		}
		if result.Err() != nil {
			log.Printf("Query error: %s\n", result.Err().Error())
		}
	} else {
		log.Panicln(err)
	}
	return querytimeReturnContainer

}

Ok, now it is clear why the function panics in case of error: log.Panicln(err). You probably want: log.Println("Error:", err) to log an error and continue execution.

Using simple go writeAPI.WritePoint(point) is dangerous because you don’t have any control. There could be thousands of such goroutines.
You are getting timeouts because the server is most probably overloaded. Especially when you have the server and client on the same machine.

I would suggest you create simulation points in one or more goroutines and feed them into a channel that is being read by defined number of write routines. In fact, we have this pattern in our benchmarks.

How quickly will the system recover after a query error like that, if it not treated as a panic? In that case, is there often a loss of data being written or recieved?

The simulation itself is fast, so I’m not sure about requiring the use of goroutines for sim points at this time. Actually, using go writeAPI.WritePoint(point) didn’t cause any overloading moreso than before - the overload occured when using a goroutine with NewPointWithMeasurement - but I can see how easily it could occur.

Would it still be useful to calculate points normally, but then control the amount of goroutines using WritePoint via a worker pool/waitgroup?

What you mean by “system”?
The server will recover from overload when the load decreases. If you are getting occasional timeout you should take this into account when setting up the client. Note, you cannot change timeout in existing client.

If your simulation mimics what will a real application do and you are experiencing huge timeouts, you have several options:

  • change deployment - keep the db server alone on the computer, have a cluster
  • get a more powerful host computer
  • increase timeouts

However, based on your description, what are you doing now looks more like a benchmark.

1 Like

By ‘system’ I meant the InfluxDB Client - If there were timeouts, I didn’t want the system to start dropping points when writing.

I tried using a set number of goroutines for writing on a test program, but the writing speed ended up being slower than not using goroutines at all. My test program looked like this:

concurrentGoroutines := make(chan struct{}, 50000) // Semaphore
var wg sync.WaitGroup                              // Wait for goroutines to finish

errorsCh := writeAPI.Errors()
// Create go proc for reading and formatting errors
go func() {
	for err := range errorsCh {
		fmt.Printf("write error: %s\n", err.Error())
	}
}()

p := influxdb2.NewPointWithMeasurement("TestData").
	AddTag("name", placeStruct.ID).
	AddField("TestValue", placeStruct.Val).
	AddField("TestValue2", placeStruct.Val+1).
	SetTime(timestamp)

wg.Add(1)
go func(pointToWrite *write.Point) {
	defer wg.Done()
	concurrentGoroutines <- struct{}{} // Set goroutine as busy
	writeAPI.WritePoint(p)             // Write to bucket, and panic if this can't happen
	<-concurrentGoroutines             // Free up goroutine
}(p)
wg.Wait()

Potentially doing something wrong here. My next step of testing is to not panic when timing out…

EDIT: Not panicing didn’t fix the crash, so I think it’s ultimately down to the limited specs of the testing computer. Ideally, though, I’d like to know if my goroutine implementation here is incorrect.

Sorry for late reply.
IMHO, using goroutines just for writing a point slows down writing.
To leverage goroutines, you could either move generating point to the same goroutine as writing, or, as I pointed before generate points in different gouroutines and feed them to a channel, which is read for by defined number of writing goroutines.
You don’t need many goroutines for writing, just 2 works ok. The main improvement is to move generating points to another goroutine.

       pointsCh := make(chan *write.Point,200)
	threads := 1
	var wg sync.WaitGroup
	go func(points int) {
			for i:=0;i<points;i++ {
				p := influxdb2.NewPoint("meas",
					map[string]string{"tag": "tagvalue"},
					map[string]interface{}{"val1": rand.Int63n(1000) , "val2": rand.Float64()*100.0-50.0},
					time.Now())
				pointsCh <- p
			}
			close(pointsCh)
	}(1000000)
	start := time.Now()
	for t :=0;t<threads;t++ {
		wg.Add(1)
		go func() {
			for p := range pointsCh {
				writeApi.WritePoint(p)
			}
			wg.Done()
		}()
	}
	wg.Wait()
	client.Close()
	fmt.Println("Took:", time.Now().Sub(start).Seconds(),"sec")