Synchronizing goroutines

via sync package

Ways of declaring goroutines

func main() {
    go sayHello()
}

func sayHello() {
    fmt.Println("hello")
}
go func() {
    fmt.Println("hello")
}() // <--- the anonymous function must be invoked immediately
sayHello := func() {
    fmt.Println("hello")
}
go sayHello()

To make sure your goroutines execute before the main goroutine we need join points. These can be created via:

WaitGroup primitive

for waiting for a set of concurrent operations to complete when you either don't care about the result of the concurrent operation, or you have other means of collecting their results

var wg sync.WaitGroup
sayHello := func() {
    defer wg.Done() // <- before we exit the goroutine, we indicate to the WaitGroup that we have exited
    fmt.Println("hello")
}
wg.Add(1) // <- one goroutine is starting
go sayHello() 
wg.Wait() // <---- join point

Closures = a function value that references variables from outside its body.

With closures, we'd have to pass a copy of the variable into the closure so by the time a goroutine is run, it will be operating on the data from its iteration of the loop.

var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println(salutation)
    }()
}
wg.Wait()

// good day
// good day
// good day

Mutex

provides a concurrent-safe way to express exclusive access to these shared resources.

sync.Mutex interface with Lock() and Unlock() methods

Shares memory by creating a convention developers must follow to synchronise access to the memory.

var count int
var lock sync.Mutex

increment := func() {
    lock.Lock() // <--- locking section
    defer lock.Unlock() // <--- unlocking
    count++
}

decrement := func() {
    lock.Lock() 
    defer lock.Unlock()
    count--
}

var arithmetic sync.WaitGroup
for i := 0; i<=5; i++ {
    arithmetic.Add(1)
    go func() {
        defer arithmetic.Done()
        increment()
    }()
}

for i := 0; i<=5; i++ {
    arithmetic.Add(1)
    go func() {
        defer arithmetic.Done()
        decrement()
    }()
}

RWMutex

same as Mutex but it provides a read/write lock. We can have a multiple number of readers holding a reader lock as long as nobody is holding a writer lock.

sync.RWMutex interface with RLock() and RUnlock() methods

RWMutex can only be held by n readers at a time, or by a single writer

Cond

a rendezvous point for goroutines waiting for or announcing the occurence of an event (=signal between 2 or more goroutines, has no info other than it happened).

sync.NewCond(&sync.Mutex{}) with 2 methods

  • Signal - notifies goroutines (runtime picks the one that has been waiting the longest) blocked on a Wait call that the condition has been triggered

c := sync.NewCond(&sync.Mutex{})
c.L.Lock()
for conditionTrue() == false {
    c.Wait() // <--- we wait to be notified that the condition has occurred
             // this is a blocking call and the goroutine will be suspended
             // allows other goroutines to run on the OS thread
}
c.L.Unlock()
c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 10)

removeFromQueue := func(delay time.Duration) {
    time.Sleep(delay)
    c.L.Lock()
    queue = queue[1:]
    c.L.Unlock()
    c.Signal() // <- let a goroutine waiting for a condition to know that smth happened
}

for i:=0; i<10; i++ {
    c.L.Lock()
    for len(queue) == 2 {
        c.Wait()
    }
    queue = append(queue, struct{}{})
    go removeFromQueue(1*time.Second)
    c.L.Unlock()
}

Brodcast - sends signal to all waiting goroutines

type Button struct { // contains a condition
    Clicked *sync.Cond
}

button := Button{Clicked: sync.NewCond(&sync.Mutex{})}

subscribe := func(c *sync.Cond, fn func()) { // allows us to register functions
    var goroutineRunning sync.WaitGroup      // to handle signals from conditions
    goroutineRunning.Add(1)
    
    go func() {
        goroutineRunning.Done()
        c.L.Lock()
        defer c.L.Unlock()
        c.Wait()
        fn()
    }()
    goroutineRunning.Wait()
}
var clickRegistered sync.WaitGroup
clickRegistered.Add(3)
subscribe(button.Clicked, func() {
    // do smth
    clickRegistered.Done()
})
subscribe(button.Clicked, func() {
    // do smth
    clickRegistered.Done()
})
subscribe(button.Clicked, func() {
    // do smth
    clickRegistered.Done()
})
    
button.Clicked.Broadcast()
clickRegistered.Wait()

Once

ensures that only one call to Do ever calls the function passed in

counts the no of times Do is called, not how many unique functions passed into Do are called

var count int
increment := func() {count++}
decrement := func() {count--}

var once sync.Once
once.Do(increment)
once.Do(decrement)

fmt.Println(count) // 1

Pool

= concurrent-safe implementation of the object pool pattern.

Get interface - checks wether the are any available instances within the pool to return to the caller, and if not, call its New member variable to create one.

Put interface - to put the instance they were working with back in the pool

myPool := &sync.Pool{
    New: func() interface{}{
        fmt.Println("Creating new instance.")
        return struct{}{}
    }
}

myPool.Get() // no instance available, calls New;
instance := myPool.Get() // no instance available, calls New;
myPool.Put(instance) // instances in pool = 1
myPool.Get() // get instance from pool

// Creating new instance.
// Creating new instance.

Uses cases:

  • memory optimisations as instantiated objects are garbage collected.

var numCalcsCreated int
calclPool := &sync.Pool{
    New: func() interface{}{
        numCalcsCreated += 1
        mem := make([]byte, 1024)
        return &mem
    },
}

// Seed the pool with 4KB
calclPool.Put(calclPool.New())
calclPool.Put(calclPool.New())
calclPool.Put(calclPool.New())
calclPool.Put(calclPool.New())

const numWorkers = 1024*1024
var wg sync.WaitGroup
wg.Add(numWorkers)

for i:=numWorkers; i>0; i-- {
    go func() {
        defer wg.Done()
        
        mem := calcPool.Get().(*[]byte)
        defer calcPool.Put(mem)
    }()
}

wg.Wait()
fmt.Printf(numCalcsCreated) // 8
  • warming up a cache of pre-allocated objects for operations that must run as quickly as possible. (by trying to guard the host machine's memory front-loading the time it takes to get a reference to another object)

func warmServiceConnCache() *sync.Pool {
    p := &sync.Pool {
        New: connectToService,
    }
    for i:=0; i<10; i++ {
        p.Put(p.New())
    }
    return p
}

funct startNetworkDaemin() *sync.WaitGroup {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        connPool := warmServiceConnCache()
        
        server, err := net.Listen("tcp", "localhost:8080")
        if err != nil {
            log.Fatalf("cannot listem: %v", err)
        }
        defer server.Close()
        
        wg.Done()
        
        for {
            conn, err := server.Accept()
            if err != nil {
                log.Printf("cannot accept connection: %v", err)
                continue
            }
            svcConn := connPool.Get()
            fmt.Fprintln(conn, "")
            connPool.Put(svcConn)
            conn.Close()
        }
    }()
    return &wg
}

Channels

can be used to synchronise access of the memory and to communicate information between goroutines.

Instantiating

var dataStream chan interface{}
dataStream = make(chan interface{})

support unidirectional flow of data:

  • channel that can only read

var dataStream <-chan interface{}
dataStream = make(<-chan interface{})
  • channel that can only send

var dataStream chan<- interface{}
dataStream = make(chan<- interface{})

Sending/Receiving

stringStream := make(chan string)
go func() {
    stringStream <- "hello" // pass literal onto channel
}()
fmt.Println(<-stringStream) // read the literal from channel
  • Channels are blocking.

  • Any goroutine that attempts to write to a channel that is full will wait until the channel has been emptied.

  • Any goroutine that attempts to read from a channel that is empty will wait until at least one item is placed on it.

Closing

stringStream := make(chan string)
close(stringStream)

Ranging over a channel

intStream := make(chan int)
go func() {
    defer close(intStream)
    for i:=1; i<=5; i++ {
        intStream <- i
    }
}()

for integer := range intStream {
    fmt.Printf("%v ", integer)
}

// 1 2 3 4 5

Closing a channel signals to multiple goroutines

begin := make(chan interface{})
var wg sync.WaitGroup
for i:=0; i<5; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        <- begin
        fmt.Printf("%v has begun\n", i)
    }()
}

fmt.Println("unblocking goroutines...")
close(begin)
wg.Wait()

Buffered Channels

channels that are given a capacity when they're instantiated.

var dataStream chan interface{}
dataStream = make(chan interface{}, 4)

Buffered channels are in-memory FIFO queue for concurrent processes to communicate over.

Result of channel operation given a channel's state

Channel Owners&Consumers

A channel owner should:

  • instantiate the channel.

  • perform writes, or pass ownership to another goroutine.

  • close the channel.

  • encapsulate the previous three things in this list and expose them via a reader channel.

A channel consumer should:

  • knowing when a channel is closed.

  • be responsible for handling blocking for any reason.

chanOwner := func() <- chan int {
    resultStream := make(chan int, 5) // create buffered channel
    go func() {
        defer close(resultStream) // close the channel
        for i:=0; i<=5; i++ {
            resultStream <- i // write to it
        }()
        return resultStream // return the read-only channel
    }
}

resultStream := chanOwner()
for result := range resultStream {
    fmt.Printf("received: %d\n", result)
}
fmt.Printf("done receiving")

Last updated