To make sure your goroutines execute before the main goroutine we need join points. These can be created via:
WaitGroup primitive
for waiting for a set of concurrent operations to complete when you either don't care about the result of the concurrent operation, or you have other means of collecting their results
var wg sync.WaitGroupsayHello :=func() {defer wg.Done() // <- before we exit the goroutine, we indicate to the WaitGroup that we have exited fmt.Println("hello")}wg.Add(1) // <- one goroutine is startinggosayHello() wg.Wait() // <---- join point
Closures = a function value that references variables from outside its body.
With closures, we'd have to pass a copy of the variable into the closure so by the time a goroutine is run, it will be operating on the data from its iteration of the loop.
var wg sync.WaitGroupfor _, salutation :=range []string{"hello", "greetings", "good day"} { wg.Add(1)gofunc() {defer wg.Done() fmt.Println(salutation) }()}wg.Wait()// good day// good day// good day
var wg sync.WaitGroupfor _, salutation :=range []string{"hello", "greetings", "good day"} { wg.Add(1)gofunc(salutation string) {defer wg.Done() fmt.Println(salutation) }(salutation) // <-- we pass in the current iteration's variable to the closure. // a copy of the string struct is made// when the goroutine is run, we'll be refering to the proper string}wg.Wait()// good day// hello// greetings
Mutex
provides a concurrent-safe way to express exclusive access to these shared resources.
sync.Mutex interface with Lock() and Unlock() methods
Shares memory by creating a convention developers must follow to synchronise access to the memory.
same as Mutex but it provides a read/write lock. We can have a multiple number of readers holding a reader lock as long as nobody is holding a writer lock.
sync.RWMutex interface with RLock() and RUnlock() methods
RWMutex can only be held by n readers at a time, or by a single writer
Cond
a rendezvous point for goroutines waiting for or announcing the occurence of an event (=signal between 2 or more goroutines, has no info other than it happened).
sync.NewCond(&sync.Mutex{}) with 2 methods
Signal - notifies goroutines (runtime picks the one that has been waiting the longest) blocked on a Wait call that the condition has been triggered
c := sync.NewCond(&sync.Mutex{})c.L.Lock()forconditionTrue() ==false { c.Wait() // <--- we wait to be notified that the condition has occurred// this is a blocking call and the goroutine will be suspended// allows other goroutines to run on the OS thread}c.L.Unlock()
c := sync.NewCond(&sync.Mutex{})queue :=make([]interface{}, 0, 10)removeFromQueue :=func(delay time.Duration) { time.Sleep(delay) c.L.Lock() queue = queue[1:] c.L.Unlock() c.Signal() // <- let a goroutine waiting for a condition to know that smth happened}for i:=0; i<10; i++ { c.L.Lock()forlen(queue) ==2 { c.Wait() } queue =append(queue, struct{}{})goremoveFromQueue(1*time.Second) c.L.Unlock()}
Brodcast - sends signal to all waiting goroutines
typeButtonstruct { // contains a condition Clicked *sync.Cond}button :=Button{Clicked: sync.NewCond(&sync.Mutex{})}subscribe :=func(c *sync.Cond, fn func()) { // allows us to register functionsvar goroutineRunning sync.WaitGroup// to handle signals from conditions goroutineRunning.Add(1)gofunc() { goroutineRunning.Done() c.L.Lock()defer c.L.Unlock() c.Wait()fn() }() goroutineRunning.Wait()}var clickRegistered sync.WaitGroupclickRegistered.Add(3)subscribe(button.Clicked, func() {// do smth clickRegistered.Done()})subscribe(button.Clicked, func() {// do smth clickRegistered.Done()})subscribe(button.Clicked, func() {// do smth clickRegistered.Done()})button.Clicked.Broadcast()clickRegistered.Wait()
Once
ensures that only one call to Do ever calls the function passed in
counts the no of times Do is called, not how many unique functions passed into Do are called
var count intincrement :=func() {count++}decrement :=func() {count--}var once sync.Onceonce.Do(increment)once.Do(decrement)fmt.Println(count) // 1
Pool
= concurrent-safe implementation of the object pool pattern.
Get interface - checks wether the are any available instances within the pool to return to the caller, and if not, call its New member variable to create one.
Put interface - to put the instance they were working with back in the pool
myPool :=&sync.Pool{ New: func() interface{}{ fmt.Println("Creating new instance.")returnstruct{}{} }}myPool.Get() // no instance available, calls New;instance := myPool.Get() // no instance available, calls New;myPool.Put(instance) // instances in pool = 1myPool.Get() // get instance from pool// Creating new instance.// Creating new instance.
Uses cases:
memory optimisations as instantiated objects are garbage collected.
var numCalcsCreated intcalclPool :=&sync.Pool{ New: func() interface{}{ numCalcsCreated +=1 mem :=make([]byte, 1024)return&mem },}// Seed the pool with 4KBcalclPool.Put(calclPool.New())calclPool.Put(calclPool.New())calclPool.Put(calclPool.New())calclPool.Put(calclPool.New())const numWorkers =1024*1024var wg sync.WaitGroupwg.Add(numWorkers)for i:=numWorkers; i>0; i-- {gofunc() {defer wg.Done() mem := calcPool.Get().(*[]byte)defer calcPool.Put(mem) }()}wg.Wait()fmt.Printf(numCalcsCreated) // 8
warming up a cache of pre-allocated objects for operations that must run as quickly as possible. (by trying to guard the host machine's memory front-loading the time it takes to get a reference to another object)
begin :=make(chaninterface{})var wg sync.WaitGroupfor i:=0; i<5; i++ { wg.Add(1)gofunc(i int) {defer wg.Done()<- begin fmt.Printf("%v has begun\n", i) }()}fmt.Println("unblocking goroutines...")close(begin)wg.Wait()
Buffered Channels
channels that are given a capacity when they're instantiated.
var dataStream chaninterface{}dataStream =make(chaninterface{}, 4)
Buffered channels are in-memory FIFO queue for concurrent processes to communicate over.
Result of channel operation given a channel's state
Channel Owners&Consumers
A channel owner should:
instantiate the channel.
perform writes, or pass ownership to another goroutine.
close the channel.
encapsulate the previous three things in this list and expose them via a reader channel.
A channel consumer should:
knowing when a channel is closed.
be responsible for handling blocking for any reason.
chanOwner :=func() <-chanint { resultStream :=make(chanint, 5) // create buffered channelgofunc() {deferclose(resultStream) // close the channelfor i:=0; i<=5; i++ { resultStream <- i // write to it }()return resultStream // return the read-only channel }}resultStream :=chanOwner()for result :=range resultStream { fmt.Printf("received: %d\n", result)}fmt.Printf("done receiving")