I’ve been creating a simulator-type system within Golang, and I want to store the data from this system via InfluxDB. However, I have recently been experiencing slowdowns when writing (for unknown reasons - the simulation itself is very quick, and only takes a few seconds to run on a single thread) and as such I wanted to see if I can enhance the speed of the write via a goroutine.
Since setting up a goroutine for every point of data would cause influxdb2 to become overwhelmed, I decided to implement a worker pool system that would restrict the number of goroutines used. However, while data was written to the database through the worker pool, it would always be corrupted, with strange spikes in values that were not present previously.
My current setup is a func named Simulate, which takes in a time value, a entity struct (containing all data to be simulated), and two separate clients, each of which write a different set of data.
maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()
concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore
var wg sync.WaitGroup // Wait for goroutines to finish
timeLength := setTimeLength(inputVar) // Example of setting length of time
simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct
for timeIterator := 0; timeIterator <= timeLength; timeIterator++ {
for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
wg.Add(1)
go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) {
defer wg.Done()
concurrentGoroutines <- struct{}{} // Set goroutine as busy
calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr
calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write
PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data
<-concurrentGoroutines // Free up goroutine
}(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
}
}
wg.Wait()
log.Println("Simulation complete.")
Setting the writing clients to be blocking does not change anything, as I have a Flush() right after writing the point.
Am I missing something here? Is there a nominal way to set up goroutines with InfluxDB2?