Go Concurrency: From the Trenches
Recently I finished some work using the Go programming language and will share here some things learned while implementing web crawlers in this cool language.
As we can see, there is no sleep() call at the end, but instead a variable
The size of this channel is 1, which makes it like a semaphore. But we can specify another positive integer while creating channel, then the new one should contain buffer function. While pushing data to the channel, the operation will only be blocked if all buffers has been filled. In the other hand, the get operation will only be blocked if all buffers are cleared. This feature can be used to control tje concurrency size of our web crawlers. Here is another example:
Our Current Implementation and its Weak Points
Our goal was to improve the efficiency and performance of some web crawler scripts that have been running in production for over two years with increasing performance problems, primarily due to CPU usage. Currently we are managing these crawlers to grab data from various web sites. The crawlers were written in Ruby, and the method used is straightforward: first get a page URL or account ID, parse the page content or invoke an API call, persist data to the db, and repeat. The benefits of this approach are that it is simple and straightforward; anyone can easily understand the structure. However, when the size of a task queue grows to be very large, weaknesses emerge. The queued jobs can get backed up waiting for responses, which in turn extends the execution time of a batch significantly.Built-in Concurrency Support
There are two possible paths to improve our code, one using multiple threads, the other using non-blocking IO and callbacks. After reading some code, I chose to use threads since I find such code to be more readable and maintainable.goroutines
Golang has a built-in concurrency construct called a goroutine, based on the CSP model (It is said the model is Newsqueak-Alef-Limbo, which has some differences between original CSP, please refer to this slide for some extra information). Here is a brief instruction to goroutines, from the official slides.- An independently executing function, launched by a go statement.
- Has its own call stack, which grows and shrinks as required.
- It's very cheap. It's practical to have thousands, even hundreds of thousands of goroutines.
- Not a thread.
- There might be only one thread in a program with thousands of goroutines.
- Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
- If you think of it as a very cheap thread, you won't be far off.
goroutines
:
package main
import "fmt"
import "time"
func main() {
fmt.Println("START 1")
for i := 0; i < 3; i++ {
foo(i)
}
fmt.Println("END 1")
fmt.Println("START 2")
for i := 0; i < 3; i++ {
go foo(i)
}
fmt.Println("END 2")
time.Sleep(1 * time.Second)
}
func foo(i int) {
time.Sleep(1)
fmt.Printf("Call index: %d\n", i)
}
And here is the output:START 1
Call index: 0
Call index: 1
Call index: 2
END 1
START 2
END 2
Call index: 0
Call index: 2
Call index: 1
There are two loops in the code, and each invokes function foo()
three times. The difference is the function is launched directly in first loop, while launched via a go
statement in the other one. The execution order of the first loop is normal, but the second loops output is out of order. Actually the output from second loop should have an unpredictable order, and you can check this by issuing more goroutines
. By the way, the sleep used in Line 19 is to guarantee all goroutines
has finished execution, since the main process does not wait for others. There are, of course, some better synchronisation mechanisms available. The full source can be found here.channels
Thegoroutine
shown above doesn't return data, so it's not a full functional component. Now let's talk about Go's channel
construct. Channels
are something like a pipe: data is pumped in from one side, and fetched from the other. Let's modify the code above to show how this works. I have removed the normal loop to make the code shorter.package main
import "fmt"
func main() {
foo_channel := make(chan string)
channel_cnt := 3
fmt.Println("Start launching goroutines")
for i := 0; i < channel_cnt; i++ {
go foo(i, foo_channel)
}
fmt.Println("Finish launching goroutines")
for i := 0; i < channel_cnt; i++ {
fmt.Println(<-foo_channel)
}
}
func foo(i int, foo_channel chan string) {
s := fmt.Sprintf("Call index %d", i)
foo_channel <- s
}
And here is the output:
Start launching goroutines
Finish launching goroutines
Call index 0
Call index 1
Call index 2
As we can see, there is no sleep() call at the end, but instead a variable
foo_channel
which pipes data from goroutines
back to the main process. The put operation will be blocked if there is already some data in the channel, and the get operation will also be blocked if there is no data in the channel. This is the method that controls function flow. Full source can be found here.The size of this channel is 1, which makes it like a semaphore. But we can specify another positive integer while creating channel, then the new one should contain buffer function. While pushing data to the channel, the operation will only be blocked if all buffers has been filled. In the other hand, the get operation will only be blocked if all buffers are cleared. This feature can be used to control tje concurrency size of our web crawlers. Here is another example:
package main
import "fmt"
import "time"
func main() {
channel_cnt := 10
concurrency_chan := make(chan bool, 2)
msg_chan := make(chan string)
fmt.Println("Start launching goroutines")
for i := 0; i < channel_cnt; i++ {
go foo(i, concurrency_chan, msg_chan)
}
fmt.Println("Finish launching goroutines")
for i := 0; i < channel_cnt; i++ {
fmt.Println(<-msg_chan)
}
}
func foo(i int, concurrency_chan chan bool, msg_chan chan string) {
concurrency_chan <- true
s := fmt.Sprintf("%s: Call index %d", time.Now(), i)
msg_chan <- s
time.Sleep(1 * time.Second)
<-concurrency_chan
}
This program create two channels, and the concurrency_chan
is a buffered channel of size 2. At the beginning of each goroutine, we put a true
value into the channel, and receive back at the other end. With these two lines, we can guarantee that only 2 goroutines are ever running at same time. Adding a sleep() can show this more clearly. Again, source code can be found here.