By Scott Frazer


2016-07-03 14:20:02 8 Comments

I'm attempting to write a simple worker pool with goroutines.

  • Is the code I wrote idiomatic? If not, then what should change?
  • I want to be able to set the maximum number of worker threads to 5 and block until a worker becomes available if all 5 are busy. How would I extend this to only have a pool of 5 workers max? Do I spawn the static 5 goroutines, and give each the work_channel?

code:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

func main() {
    var work_channel = make(chan string)
    var results_channel = make(chan string)

    // create goroutine per item in work_channel
    go func() {
        var c = 0
        var wg sync.WaitGroup
        for work := range work_channel {
            wg.Add(1)
            go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
            c++
        }
        wg.Wait()
        fmt.Println("closing results channel")
        close(results_channel)
    }()

    // add work to the work_channel
    go func() {
        for c := 'a'; c < 'z'; c++ {
            work_channel <- fmt.Sprintf("%c", c)
        }
        close(work_channel)
        fmt.Println("sent work to work_channel")
    }()

    for x := range results_channel {
        fmt.Printf("result: %s\n", x)
    }
}

2 comments

@icza 2016-07-03 16:54:21

Your solution is not a worker goroutine pool in any sense: your code does not limit concurrent goroutines, and it does not "reuse" goroutines (it always starts a new one when a new job is received).

Producer-consumer pattern

As posted at Bruteforce MD5 Password cracker, you can make use of the producer-consumer pattern. You could have a designated producer goroutine that would generate the jobs (things to do / calculate), and send them on a jobs channel. You could have a fixed pool of consumer goroutines (e.g. 5 of them) which would loop over the channel on which jobs are delivered, and each would execute / complete the received jobs.

The producer goroutine could simply close the jobs channel when all jobs were generated and sent, properly signalling consumers that no more jobs will be coming. The for ... range construct on a channel handles the "close" event and terminates properly. Note that all jobs sent before closing the channel will still be delivered.

This would result in a clean design, would result in fixed (but arbitrary) number of goroutines, and it would always utilize 100% CPU (if # of goroutines is greater than # of CPU cores). It also has the advantage that it can be "throttled" with the proper selection of the channel capacity (buffered channel) and the number of consumer goroutines.

Note that this model to have a designated producer goroutine is not mandatory. You could have multiple goroutines to produce jobs too, but then you must synchronize them too to only close the jobs channel when all producer goroutines are done producing jobs - else attempting to send another job on the jobs channel when it has already been closed results in a runtime panic. Usually producing jobs are cheap and can be produced at a much quicker rate than they can be executed, so this model to produce them in 1 goroutine while many are consuming / executing them is good in practice.

Handling results:

If jobs have results, you may choose to have a designated result channel on which results could be delivered ("sent back"), or you may choose to handle the results in the consumer when the job is completed / finished. This latter may even be implemented by having a "callback" function that handles the results. The important thing is whether results can be processed independently or they need to be merged (e.g. map-reduce framework) or aggregated.

If you go with a results channel, you also need a goroutine that receives values from it, preventing consumers to get blocked (would occur if buffer of results would get filled).

With results channel

Instead of sending simple string values as jobs and results, I would create a wrapper type which can hold any additional info and so it is much more flexible:

type Job struct {
    Id     int
    Work   string
    Result string
}

Note that the Job struct also wraps the result, so when we send back the result, it also contains the original Job as the context - often very useful. Also note that it is profitable to just send pointers (*Job) on the channels instead of Job values so no need to make "countless" copies of Jobs, and also the size of the Job struct value becomes irrelevant.

Here is how this producer-consumer could look like:

I would use 2 sync.WaitGroup values, their role will follow:

var wg, wg2 sync.WaitGroup

The producer is responsible to generate jobs to be executed:

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

When done (no more jobs), the jobs channel is closed which signals consumers that no more jobs will arrive.

Note that produce() sees the jobs channel as send only, because that's what the producer needs to do only with that: send jobs on it (besides closing it, but that is also permitted on a send only channel). An accidental receive in the producer would be a compile time error (detected early, at compile time).

The consumer's responsibility is to receive jobs as long as jobs can be received, and execute them:

func consume(id int, jobs <-chan *Job, results chan<- *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
        results <- job
    }
}

Note that consume() sees the jobs channel as receive only; consumer only needs to receive from it. Similarly the results channel is send only for the consumer.

Also note that the results channel cannot be closed here as there are multiple consumer goroutines, and only the first attempting to close it would succeed and further ones would result in runtime panic! results channel can (must) be closed after all consumer goroutines ended, because then we can be sure no further values (results) will be sent on the results channel.

We have results which need to be analyzed:

func analyze(results <-chan *Job) {
    defer wg2.Done()
    for job := range results {
        fmt.Printf("result: %s\n", job.Result)
    }
}

As you can see, this also receives results as long as they may come (until results channel is closed). The results channel for the analyzer is receive only.

Please note the use of channel types: whenever it is sufficient, use only a unidirectional channel type to detect and prevent errors early, at compile time. Only use bidirectional channel type if you do need both directions.

And this is how all these are glued together:

func main() {
    jobs := make(chan *Job, 100)    // Buffered channel
    results := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs, results)
    }
    // Start producing
    go produce(jobs)

    // Start analyzing:
    wg2.Add(1)
    go analyze(results)

    wg.Wait() // Wait all consumers to finish processing jobs

    // All jobs are processed, no more values will be sent on results:
    close(results)

    wg2.Wait() // Wait analyzer to analyze all results
}

Example output:

Here is an example output:

As you can see, results are coming and getting analyzed before all the jobs would be enqueued:

worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms

Try the complete application on the Go Playground.

Without a results channel

Code simplifies significantly if we don't use a results channel but the consumer goroutines handle the result right away (print it in our case). In this case we don't need 2 sync.WaitGroup values (the 2nd was only needed to wait for the analyzer to complete).

Without a results channel the complete solution is like this:

var wg sync.WaitGroup

type Job struct {
    Id   int
    Work string
}

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

func consume(id int, jobs <-chan *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
    }
}

func main() {
    jobs := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs)
    }
    // Start producing
    go produce(jobs)

    wg.Wait() // Wait all consumers to finish processing jobs
}

Output is "like" that of with results channel (but of course execution/completion order is random).

Try this variant on the Go Playground.

@Anfernee 2016-07-03 17:50:40

I like this answer. Why remove and create new goroutines when they're all doing the same thing? Even though goroutines are light and cheap, we shouldn't take that for granted. I believe this is good practice.

@Scott Frazer 2016-07-03 19:14:44

Whoa, amazing! Thanks!

@AmaJayJB 2018-03-07 20:06:11

I wish I could double/tripe/quadruple like this!

@AmaJayJB 2018-03-07 22:44:18

Firstly thanks, this is awesome. I think you may have one extra wg.Done() then wg.Add(1) since you wg.Add(1) for each consumer and defer a wg.Done() for each consumer, but you also wg.Done() for produce. Which means that the wg.Wait() might not wait till all 5 consumers are done. Just checking?

@icza 2018-03-08 10:46:57

@AmaJayJB You're right, nice catch. Thanks. I fixed it by removing the wg.Done() from the producer.

@Markus W Mahlberg 2018-12-26 11:03:43

Wouldn’t the actual Go terminology call this a fan out pipeline?

@icza 2018-12-26 18:05:21

@MarkusWMahlberg Fan out is used when multiple channels are merged into a single one. This is not used in the answer.

@Markus W Mahlberg 2018-12-26 18:05:57

Wouldn’t that be a fan in?

@icza 2018-12-26 18:07:46

@MarkusWMahlberg You're right, fan out is when multiple functions read from a channel until it is closed. Although that is also not used in the answer, there is only one consumer.

@abhink 2016-07-03 15:14:41

You can implement a counting semaphore to limit goroutine concurrency.

var tokens = make(chan struct{}, 20)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    tokens <- struct{}{} // acquire a token before performing work
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    <-tokens // release the token
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

This is the general design used to limit the number of workers. You can of course change location of releasing/acquiring of tokens to fit your code.

@Anfernee 2016-07-03 17:12:51

You should be careful when mentioning parallelism. go func(){}() does not always guarantee parallelism. With that aside, I think this is the simplest way to limit the number of goroutines in flight.

Related Questions

Sponsored Content

41 Answered Questions

[SOLVED] "implements Runnable" vs "extends Thread" in Java

47 Answered Questions

[SOLVED] How do I update the GUI from another thread?

33 Answered Questions

[SOLVED] What is the difference between a process and a thread?

17 Answered Questions

[SOLVED] How to use threading in Python?

25 Answered Questions

[SOLVED] What is a daemon thread in Java?

12 Answered Questions

[SOLVED] Is JavaScript guaranteed to be single-threaded?

24 Answered Questions

[SOLVED] Is there any way to kill a Thread?

4 Answered Questions

8 Answered Questions

[SOLVED] Threading pool similar to the multiprocessing Pool?

1 Answered Questions

[SOLVED] Different results for N>1 goroutines (on N>1 Cpu:s). Why?

Sponsored Content