Setting up a golang worker pool w/ a write operation - InfluxDB 2 data corrupted?

I’ve been creating a simulator-type system within Golang, and I want to store the data from this system via InfluxDB. However, I have recently been experiencing slowdowns when writing (for unknown reasons - the simulation itself is very quick, and only takes a few seconds to run on a single thread) and as such I wanted to see if I can enhance the speed of the write via a goroutine.

Since setting up a goroutine for every point of data would cause influxdb2 to become overwhelmed, I decided to implement a worker pool system that would restrict the number of goroutines used. However, while data was written to the database through the worker pool, it would always be corrupted, with strange spikes in values that were not present previously.

My current setup is a func named Simulate, which takes in a time value, a entity struct (containing all data to be simulated), and two separate clients, each of which write a different set of data.

maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()

concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore 
var wg sync.WaitGroup // Wait for goroutines to finish

timeLength := setTimeLength(inputVar) // Example of setting length of time

simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct

for timeIterator := 0; timeIterator <= timeLength; timeIterator++ { 
    for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
        wg.Add(1)
        go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) { 
            defer wg.Done()
            concurrentGoroutines <- struct{}{} // Set goroutine as busy
            calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr

            calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write

            PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data 
            <-concurrentGoroutines // Free up goroutine
        }(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
    }
}
wg.Wait()
log.Println("Simulation complete.")

Setting the writing clients to be blocking does not change anything, as I have a Flush() right after writing the point.

Am I missing something here? Is there a nominal way to set up goroutines with InfluxDB2?

It turns out that I was using the InfluxDB 2 client incorrectly - I should have been passing a write API into the propagate function instead of the entire client. Doing so meant that the write speed sped up dramatically; as such, the goroutine is not required.

The fact that you should pass in a write API instead of a client in situations such as these was not known to me before, though, so consider this lessons learned.

This topic was automatically closed 60 minutes after the last reply. New replies are no longer allowed.