Recently I was working on a feature to add a flag to the oc
command line tool. oc
has a sub-command that helps an OpenShift administrator collect “must-gather” from their OpenShift cluster. must-gather
is a command that gathers information that might help solve the problem causing an OpenShift cluster to function erratically. It is like sosreport
but focused on a Kubernetes cluster instead of an individual host.
The flag I was adding was --all-images
which would replace a command like below with a simple oc adm must-gather --all-images
:
$ oc adm must-gather \
--image=quay.io/myrepo/image1 \
--image=quay.io/myrepo/image2 \
--image=registry.redhat.io/container-native-virtualization/cnv-must-gather-rhel9:v4.14.2
$ oc adm must-gather --all-images
To know more details about --all-images
flag, take a look at this enhancement proposal.
As mentioned in the proposal, we want to limit the number of Pods started to collect the cluster’s must-gather to four. Otherwise, --image
or --all-images
flag could potentially overwhelm the cluster and cause more problem on an already erratic cluster.
Semaphore pattern Link to heading
I’m not well-versed with Design Patterns, Concurrency Patterns, or any kind of Computer Science patterns. So, when I started out, I wrote code that I knew best to limit the number of goroutines. And I don’t know how I knew that approach, but it’s either because I keep reading StackOverflow Q&A with goroutines tag or the golang subreddit.
Anyway, a simple Go program to limit number of goroutines based on semaphore pattern would look like below:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// number of concurrent goroutines we want to run
const workers = 4
func main() {
var wg sync.WaitGroup
// channel to be populated and depopulated at the start and end of each goroutine
w := make(chan struct{}, workers)
for i := 0; i < 10; i++ {
w <- struct{}{}
wg.Add(1)
go func() {
defer func() {
<-w
wg.Done()
}()
fmt.Println("# goroutines: ", runtime.NumGoroutine())
time.Sleep(5 * time.Second)
}()
}
wg.Wait()
}
What’s this code doing?
- First, we create a buffered channel of the size of maximum number of goroutines we want to start concurrently. In our example it’s 4. It helps ensure that our code doesn’t start any more than four goroutines.
- A
for
loop that will, in total, start 10 goroutines. - First statement in the
for
loop puts astruct{}{}
on the channel. If the channel is full, this operation will wait till there is space available on it. Hence, it prevents further goroutines from spinning up. - Once we put a
struct{}{}
on the channel, we start a goroutine that prints number of running goroutines (usingruntime.NumGoroutine()
) and sleeps for 2 seconds before reading astruct{}{}
from the channel and decrementing the WaitGroup counter by 1 in itsdefer
call. - A
wg.Wait()
call is required to make sure that themain
goroutine doesn’t exit before all of the goroutines created infor
loop are done processing. Try removing it from the code and see if the# goroutines
is printed 10 times or less.
This code spins up a total of 10 goroutines. Since creation and deletion of goroutines is not very expensive, we don’t mind it. However, there are cases where it’s better idea to create less goroutines.
Enter worker pool pattern.
Worker Pool pattern Link to heading
When I opened a PR that solved the problem using Semaphore pattern, I got a review from the maintainer to try and use a queue of Pods. The code they referenced used RateLimitingInterface
interface from the Kubernetes client-go library. I spent some time looking at it, but it made no sense, and after a brief discussion on the PR, I started learning and then implementing Worker Pool pattern to solve the problem.
A simple Go program to limit number of goroutines based on Worker Pool pattern would look like below:
package main
import (
"fmt"
"sync"
"time"
)
var concurrentMG = 4
func main() {
var wg sync.WaitGroup
podChan := make(chan int)
go func(){
for i := 0; i < 10; i++ {
// put the task to process on podChan
podChan <- i
}
close(podChan)
}()
// spin up concurrentMG number of workers
wg.Add(concurrentMG)
for i := 0; i < concurrentMG; i++ {
fmt.Printf("Spinning up a goroutine...\n")
go func() {
// read till podChan is empty
for i := range podChan {
fmt.Printf("%d\n", i)
time.Sleep(1 * time.Second)
}
// no more pods to process
wg.Done()
}()
}
wg.Wait()
}
This code achieves the same goal of running only a certain number of goroutines concurrently, but it does so by spinning up those goroutines right away and reading the data to be processed through an unbuffered channel. Let’s look at what this code does:
- Create an unbuffered channel where we will put the data to be processed.
- A goroutine that puts data onto the
podChan
and closes thepodChan
when it’s done. This code is invoked in a goroutine since we are putting data onto an unbuffered channel which blocks the execution unless something reads from the channel. - The
wg.Add(concurrentMG)
statement is indicating that we will be startingconcurrentMG
number of goroutines. - The
for
loop in this code is starting the goroutines. Thefor
loop within each goroutine will read from thepodChan
. Thisfor
loop exits whenpodChan
is closed. At this point, we callwg.Done()
.
For the same task, Semaphore pattern spins up 10 goroutines while Worker Pool pattern spins up only 4 goroutines. There might be use cases where latter is more performant over the former when the total number of goroutines is larger.
Queue of tasks Link to heading
Note that this is not a pattern. After I implemented the Worker Pool pattern, the maintainers asked me to use the queue approach they recommended earlier because the larger oc
code uses that at a bunch of places which made maintaining things easier.
A simple program using queue approach is available on this repository. It uses the queue implementation from Kubernetes code. To me, this approach is akin to using the Worker Pool pattern without the channels. It differs from the Worker Pool pattern in that queue.Add
and queue.Get()
statemets ensure that tasks are performed in a FIFO order, whereas with the Worker Pool pattern covered above doesn’t gaurantee order.
An excellent in-depth explanation of this rate limiting queue available in Kubernetes code can be found in this blog.
That’s it Link to heading
Initially I despised the back and forth that happened in the process of getting this PR approved. But, in the end, I am glad that it helped me try and learn different approaches. If you have any feedback/suggestions, please let me know via Twitter.