Tropical Software Observations

01 July 2014

Posted by Anonymous

at 6:38 PM

2 comments

Labels:

Go Concurrency: From the Trenches

Recently I finished some work using the Go programming language and will share here some things learned while implementing web crawlers in this cool language.

Our Current Implementation and its Weak Points

Our goal was to improve the efficiency and performance of some web crawler scripts that have been running in production for over two years with increasing performance problems, primarily due to CPU usage. Currently we are managing these crawlers to grab data from various web sites. The crawlers were written in Ruby, and the method used is straightforward: first get a page URL or account ID, parse the page content or invoke an API call, persist data to the db, and repeat. The benefits of this approach are that it is simple and straightforward; anyone can easily understand the structure. However, when the size of a task queue grows to be very large, weaknesses emerge. The queued jobs can get backed up waiting for responses, which in turn extends the execution time of a batch significantly.

Built-in Concurrency Support

There are two possible paths to improve our code, one using multiple threads, the other using non-blocking IO and callbacks. After reading some code, I chose to use threads since I find such code to be more readable and maintainable.

goroutines

Golang has a built-in concurrency construct called a goroutine, based on the CSP model (It is said the model is Newsqueak-Alef-Limbo, which has some differences between original CSP, please refer to this slide for some extra information). Here is a brief instruction to goroutines, from the official slides.
  • An independently executing function, launched by a go statement.
  • Has its own call stack, which grows and shrinks as required.
  • It's very cheap. It's practical to have thousands, even hundreds of thousands of goroutines.
  • Not a thread.
  • There might be only one thread in a program with thousands of goroutines.
  • Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
  • If you think of it as a very cheap thread, you won't be far off.
Here is a simple sample program with goroutines:

package main

import "fmt"
import "time"

func main() {
     fmt.Println("START 1")
     for i := 0; i < 3; i++ {
          foo(i)
     }
     fmt.Println("END 1")

     fmt.Println("START 2")
     for i := 0; i < 3; i++ {
          go foo(i)
     }
     fmt.Println("END 2")

     time.Sleep(1 * time.Second)
}

func foo(i int) {
     time.Sleep(1)
     fmt.Printf("Call index: %d\n", i)
}

And here is the output:

START 1
Call index: 0
Call index: 1
Call index: 2
END 1
START 2
END 2
Call index: 0
Call index: 2
Call index: 1

There are two loops in the code, and each invokes function foo() three times. The difference is the function is launched directly in first loop, while launched via a go statement in the other one. The execution order of the first loop is normal, but the second loops output is out of order. Actually the output from second loop should have an unpredictable order, and you can check this by issuing more goroutines. By the way, the sleep used in Line 19 is to guarantee all goroutines has finished execution, since the main process does not wait for others. There are, of course, some better synchronisation mechanisms available. The full source can be found here.

channels

The goroutine shown above doesn't return data, so it's not a full functional component. Now let's talk about Go's channel construct. Channels are something like a pipe: data is pumped in from one side, and fetched from the other. Let's modify the code above to show how this works. I have removed the normal loop to make the code shorter.

package main

import "fmt"

func main() {

    foo_channel := make(chan string)
    channel_cnt := 3

    fmt.Println("Start launching goroutines")
    for i := 0; i < channel_cnt; i++ {
        go foo(i, foo_channel)
    }
    fmt.Println("Finish launching goroutines")

    for i := 0; i < channel_cnt; i++ {
        fmt.Println(<-foo_channel)
    }

}

func foo(i int, foo_channel chan string) {
    s := fmt.Sprintf("Call index %d", i)
    foo_channel <- s
}

And here is the output:

Start launching goroutines
Finish launching goroutines
Call index 0
Call index 1
Call index 2


As we can see, there is no sleep() call at the end, but instead a variable foo_channel which pipes data from goroutines back to the main process. The put operation will be blocked if there is already some data in the channel, and the get operation will also be blocked if there is no data in the channel. This is the method that controls function flow. Full source can be found here.

The size of this channel is 1, which makes it like a semaphore. But we can specify another positive integer while creating channel, then the new one should contain buffer function. While pushing data to the channel, the operation will only be blocked if all buffers has been filled. In the other hand, the get operation will only be blocked if all buffers are cleared. This feature can be used to control tje concurrency size of our web crawlers. Here is another example:

package main

import "fmt"
import "time"

func main() {

    channel_cnt := 10
    concurrency_chan := make(chan bool, 2)
    msg_chan := make(chan string)

    fmt.Println("Start launching goroutines")
    for i := 0; i < channel_cnt; i++ {
        go foo(i, concurrency_chan, msg_chan)
    }
    fmt.Println("Finish launching goroutines")

    for i := 0; i < channel_cnt; i++ {
        fmt.Println(<-msg_chan)
    }

}

func foo(i int, concurrency_chan chan bool, msg_chan chan string) {
    concurrency_chan <- true
    s := fmt.Sprintf("%s: Call index %d", time.Now(), i)
    msg_chan <- s
    time.Sleep(1 * time.Second)
    <-concurrency_chan
}
This program create two channels, and the concurrency_chan is a buffered channel of size 2. At the beginning of each goroutine, we put a true value into the channel, and receive back at the other end. With these two lines, we can guarantee that only 2 goroutines are ever running at same time. Adding a sleep() can show this more clearly. Again, source code can be found here.

Web Crawlers in Golang

With goroutines, we can implement concurrent web crawlers more easily than with traditional threads. Like with our original architecture, the new approach also has three basic steps: 1) fetch a task from queue server, 2) parse the URL or account ID, then 3) save to the database. The difference is in the second and third steps, now goroutines are used to inexpensively multiplex our calls to external services. Since a goroutine is implemented as a regular function, the changes to our original logic are minor, even trivial. However, the improvement is great. During the testing, we found CPU utilisation for hosts running our crawlers has been reduced by up to 90%. But this is an upper bound and multiple hosts are still needed for backup and to segregate API calls to multiple services.


2 comments:

Ran said...

Informative blog! it was very useful for me.Thanks for sharing. Reference: https://webdesignvalley.com/

Darren Demers said...

The size of this channel is 1, which makes it like a semaphore. But we can specify another positive integer while creating channel, then the new one should contain buffer function. While pushing data to the channel, the operation will only be blocked if all buffers has been filled. In the other hand, the get operation will only be blocked if all buffers are cleared. This feature can be used to control tje concurrency size of our web crawlers. Here is another example:
Cricut Designs , Blend Pedia and Insight Full Updates