WriteAPI starts goroutines and doesn't finish them

Hello,
I’m writing simple parser for storing data from websocket to Influx 2.0 using Go. And everything seemed to be fine till I noticed that every time I use WritePoint method two new goroutines are spawned and they are not finished even after client.Close(). The effect is that after some time I have hundreds of goroutines.
Here is a part of code:

sublogger.Debug().Msgf("Goroutines before client instace: %d", runtime.NumGoroutine())`
client := influxdb2.NewClient(influxdb2URL, influxToken)
sublogger.Debug().Msgf("Goroutines after client instace: %d", runtime.NumGoroutine())
defer func() {
		sublogger.Debug().Msgf("Goroutines before client close: %d", runtime.NumGoroutine())
		client.Close()
		sublogger.Debug().Msgf("Goroutines on exit: %d", runtime.NumGoroutine())

	}()
sublogger.Debug().Msgf("Goroutines before writeApi instance: %d", runtime.NumGoroutine())
writeAPI := client.WriteAPIBlocking(influxOrg, "ISA_data")
sublogger.Debug().Msgf("Goroutines after writeApi instance: %d", runtime.NumGoroutine())

stateDataPoint := influxdb2.NewPointWithMeasurement("raw_state1")
stateDataPoint.AddTag("ISID", rtu.ISID)
stateDataPoint.AddTag("core_id", event.Data.CoreID)
stateDataPoint.SetTime(event.Data.Timestamp)

for idx, reg := range framemap.State.Variable {
	rawValue := string(rtu.State[(reg.StartByte)*2-1 : (reg.StartByte)*2-1+4])
	decodedValue, err := strconv.ParseInt(rawValue, 16, 16)
	if err != nil {
		sublogger.Error().Str("event", "RTUValueDecode").Str("eventData", event.Data.Data).Err(err).Msg("")
		return
	}
	if framemap.State.Variable[idx].Factor != 1 {
		framemap.State.Variable[idx].DecodedValue = float64(decodedValue) / float64(framemap.State.Variable[idx].Factor)
	} else {
		framemap.State.Variable[idx].DecodedValue = decodedValue
	}
framemap.State.Variable[idx].DecodedValue)
stateDataPoint.AddField(framemap.State.Variable[idx].FieldKey, framemap.State.Variable[idx].DecodedValue)
}
sublogger.Debug().Msgf("Goroutines before write: %d", runtime.NumGoroutine())
err = writeAPI.WritePoint(context.Background(), stateDataPoint)
if err != nil {
	sublogger.Error().Err(err).Msg("")
}
sublogger.Debug().Msgf("Goroutines after write: %d", runtime.NumGoroutine())

And here is log:

"Goroutines before client instace: 5"
"Goroutines after client instace: 5"
"Goroutines before writeApi instance: 5"
"Goroutines after writeApi instance: 5"
"Goroutines before write: 5"
"Goroutines after write: 7"
"Goroutines before client close: 7"
"Goroutines on exit: 7"

Next time I start this function number of goroutines will be 7 and two more will be spwned and so on.
Same situation is with asynchronous write.

If I use debuger there are more and more of these net/http and internal/pool:
Code_cmHY3F6J0K

So am I doing something wrong or influx-client is not closing the connection?

Thanks for help in advance.

Hello @Piotr_Piotrowski,
I’m not sure. I would expect client.Close() to work. Maybe @bednar can help here? Thank you. Maybe you need to include Flus() api · pkg.go.dev

@VlastaHajek Can you look at this?

@Piotr_Piotrowski, I’m assuming you are using the latest version. 2 goroutines are created by Go net/http for managing connections. Those should be definitely closed after calling Client.Close, but it needs time. Please, try waiting a bit, e.g. a second, before checking #goroutines after closing the client.

No they are not closed, after an hour I’d more then 2000 goroutines. But I found that issue: Client is leaking TCP connections · Issue #183 · influxdata/influxdb-client-go · GitHub
Which you already know @VlastaHajek :slight_smile:
So as a solution I decided to create one influx client in my goroutines worker and pass it to goroutines. It seemed to resolve the problem but then I have memory leaks.

Now it seems that even though I use defer writeAPI.Close() it is not properly freeing memory. It would be great if there was an example in docs on how to use concurrent client.

Sure, I remember that issue. This why I asked if you are using the latest version. Because, the main problem of that issue was fixed in v2.1.0.
However, if you are using writeAPI.Close(), you must have an obsolete version 1.x.x. This method was removed in v2.0.0.

The best practice is to have a single client per server URL, and a single write api per org/bucker pair.
If you have multiple writing goroutines, still use just single write api, either blocking or non-blocking.

OK, you are perfectly right. I’ve no idea why I had import "github.com/influxdata/influxdb-client-go/" instead of v2. So this was the main problem. I changed the code so that there are two writeapi one for each bucket. Now it seems to be ok, no goroutine and memory leaks.
I mark your post as a solution and best practice for concurrency.
Thank you for help, we can close the discussion.

Great, good work in finding the problem.
I’m going to add more info about concurrent usage to docs.