Limit the number of goroutines running at the same time

Recently I needed to write a program that would continuously read from a log file, parse it and deliver matching data to an endpoint.

The pseudocode would look like this:

1
2
3
for read line from file
  thread(do_something(line))
end

The program would do_something() with the line in a thread without blocking the loop and the rest of the process. Besides, I would need to control the number of threads running in the background because I wouldn’t want this program – which has one job to do – to use more system resources than it needs to, or negatively affect the system’s load it’s running on in any way.

Go’s concurrency and channel communication seemed ideal for this program. I would end up learning a bit more about Go as well.

First, I need a buffered channel.

1
concurrent := make(chan bool, MAX_ROUTINES)

The program will receive from the channel concurrent when it needs to run a goroutine to deliver data. When there’s nothing to receive from the channel it will wait until there is, so let’s fill it:

1
2
3
for i := 0; i < MAX_ROUTINES; i++ {
  concurrent <- true
}

When a goroutine finishes its job it will send to done channel.

1
done := make(chan bool)

The wait channel shown below will determine when the program exits. We’ll send to this channel only once – when all the goroutines have finished running – and the program will not exit until it has received from it:

1
wait := make(chan bool)

total and finished counters will be used to find out whether there are any remaining goroutines running. allDone will be set true when there is nothing left to do – file has been read and it is time to exit:

1
2
3
total := 0
finished := 0
allDone := false

Below goroutine will be running an infinite loop. Whenever a goroutine has finished running we will check if allDone is set to true, and whether or not the number of finished goroutines is equal to the number of launched goroutines 1. If this is the case we will break the infinite loop and send to wait channel 2. Otherwise, make room for another goroutine, since one has just finished:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go func() {
    finished := 0
    for {
        // block until a goroutine has finished
        <-done

        // a goroutine has just finished
        finished += 1

        // check if we should break
        if allDone && finished == total {
            break
        }

        // there are still goroutines running
        // another goroutine can start since one has finished
        concurrent <- true
    }

    // all goroutines have finished running
    // the program can now terminate
    wait <- true
}()

While the program is continuously reading data from the file, it will attempt to receive from the concurrent channel so it can run a goroutine to do something with the data 3. If there are MAX_ROUTINES goroutines already running at the time, receive from concurrent channel will block. During this time no data will be read from the Reader. When a goroutine finishes, infinite loop shown above will send to concurrent 4 which will be picked up by the receive operation below. The program will then run a goroutine to deal with the data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for {
    // read data
    // ...
    if readFailed {
        break
    }

    // ...

    // block until receive
    <- concurrent

    // increment the total counter
    total += 1

    // run a goroutine to do something with the data
    go func(data []string) {
        DoSomething(data)
        // finishing up
        done <- true
    }(data)
}

After we are done with reading from the file we will set allDone to true to let the infinite loop know that the program can now terminate. The program will terminate as soon as it receives from the wait channel:

1
2
allDone = true
<-wait

There it is.

The program will continuously read data from the file, launch as many goroutines as it needs but no more than MAX_ROUTINES running at the same time, and will terminate only after it has reached EOF or other termination and all goroutines have finished.

  1. line 11: if allDone && finished == total { ↩︎

  2. line 22: wait <- true ↩︎

  3. line 11: <- concurrent ↩︎

  4. line 17: concurrent <- true ↩︎

Tags: 

Related Notes: