Advanced Concurrency
Package singleflight
provides a duplicate function call suppression mechanism.
executes and returns the results of the given function, making sure that only one execution is in-flight for a given key at a time. If a duplicate comes in, the duplicate caller waits for the original to complete and receives the same results.
// you have a database with weather information per city and you want to expose this as an API. In some cases you might have multiple users ask for the weather for the same city at the same time.
// => just query the database, and then share the result to all the waiting requests
package weather
type Info struct {
TempC, TempF int // temperature in Celsius and Farenheit
Conditions string // "sunny", "snowing", etc
}
var group singleflight.Group
func City(city string) (*Info, error) {
results, err, _ := group.Do(city, func() (interface{}, error) {
info, err := fetchWeatherFromDB(city) // slow operation
return info, err
})
if err != nil {
return nil, fmt.Errorf("weather.City %s: %w", city, err)
}
return results.(*Info), nil
}Package errgroup
best described as a
sync.WaitGroupbut where the tasks return errors that are propagated back to the waiter.useful when you have multiple operations that you want to wait for, but you also want to determine if they all completed successfully.
Bounded concurrency via buffered channels
through the use of semaphores by keeping track of how many tasks are running, and to block until there is room for another task to start.
to allow up to 10 tasks to run at once, we create a channel with space for 10 items:
semaphore := make(chan struct{}, 10)
to start a new task, blocking if too many tasks are already running, we simply attempt to send a value on the channel:
semaphore <- struct{}{}When a task completes, mark it as such by taking a value out of the channel:
<-semaphore
Weighted bounded concurrency
not all tasks are equally expensive => instead of reasoning about the number of tasks we want to run concurrently, we come up with a "cost" for every task and acquire and release that cost from a semaphore.
golang.org/x/sync/sempahore package provides a weighted semaphore implementation
sem <- struct{}{}operation is called "Acquire"semaphore.Acquiremethod returns an error; that is because it can be used with thecontextpackage to abort the operation early.
<-semoperation is called "Release"
Resources:
Last updated
Was this helpful?