About us

At sparkgeo we make maps, a lot of maps. We also have a product called maptiks which is like google analytics but for your web maps. During the development process of maptiks we were required to capture, process and store a lot of data that was being generated by user activity. golang along with AWS emerged as a natural choice as we shall see the sections to come.

About this article

In this article we discuss how one can take advantage of the rich concurrency support offered by golang to implement a simple worker architecture which can operate with AWS. We attempt to recreate a server-like application which accepts requests and puts them onto a queue(SQS) and then the worker architecture reads, processes and stores it to a database(DynamoDB).

Some other aspects discussed in this article are:

  1. concurrency in golang,
  2. reducing AWS cost,
  3. using a channel as a buffer for the worker architecture, and
  4. implementing a basic worker architecture in golang.

Golang

Let us begin with talking about the language in question: go. It is developed by the people who worked on Unix and C. First released in 2009, golang has become one of the most popular languages.

A starting point would be An Introduction To Programming in Go and The Little Go Book.

Some of the characteristics of golang are listed below:

  1. A dynamic language style type inference:
x := 0  
  1. Concurrency primitives such as:

    2.1 goroutines: Light-weight processes,

    2.2 channels: provide a way for two goroutines to communicate with one another and synchronize their execution, and

    2.3. select statement: works like a switch but for channels.

  2. Some omissions to keep it simple.

Most of the concurrency features are based on the CSP style of programming introduced by Tony Hoare in the paper with the same name which can be found here.

In general, communicating sequential processes (CSP) is a formal language for describing patterns of interaction in concurrent systems.

A generic implementation

The code snippet below provides a generic implementation for a go routine to work with a channel.

func function_name(parameter_0, ....., parameter_n) {

    go func() { // go routine
        for { // like a while true condition 
            select { // switch case but for channels

                case msg1 := <- channel_1: // if message received on channel_1 then .... 
                    perform work 

                  case msg2 := <- channel_2: // if message received on channel_2 then ....
                    // perform work
            }
        }
    }()

}

This is the basic skeleton for most of the components in the Application architecture.

Application architecture

alt

Figure 1. Application architecture

Figure (1.) shows the architecture of the application and how each of the components interact with each other.

AWS components

In this tutorial we use the following services:

SQS

Simple Queue Service (SQS) is a fast, reliable, scalable, fully managed message queuing service. We use it to store the large amounts of data captured by the catcher.

(Tip : Enable long polling on SQS to reduce cost. More information can be found here and here.)

DynamoDB

Amazon DynamoDB is a fast and flexible NoSQL database service for all applications that need consistent, single-digit millisecond latency at any scale. It is a fully managed cloud database and supports both document and key-value store models. We use it to store the processed data generated by the worker architecture.

The basics of interacting with AWS in golang can be found here.

Catcher

The catcher listens to a particular end point and handles the incoming requests which in this case is to store the message in SQS. listen_and_serve an endpoint to which requests are sent, capture and format them to eventually push them onto the queue.

package main

import (  
    // "fmt"
    "net/http"
)

// SQS
var q = queue_init()

func main() {  
    http_handler()
}

func http_handler() {  
    http.HandleFunc("/work", handle_func)
    listen_and_serve()
}

func handle_func(rs http.ResponseWriter, rq *http.Request) {  
    input_message_json, _ := messageCreation(rq)
    send_msg(input_message_json, &q)
    rs.WriteHeader(http.StatusOK)
}

func listen_and_serve() {  
    http.ListenAndServe("127.0.0.1:7001", nil)
}

Worker architecture

The worker architecture consists of these components:

Dispatcher

The dispatcher pulls off work requests from SQS and puts them on to a channel for the workers to consume.

var WorkerQueue chan chan sqs.ReceiveMessageResponse

var BufferQueue chan sqs.ReceiveMessageResponse

func dispatcher_init(worker_count int, q *sqs.Queue, dequeuer *Dequeuer) {  
    // First, initialize the channel we are going to put the workers' work channels into.
    WorkerQueue = make(chan chan sqs.ReceiveMessageResponse, worker_count)

    //Buffer between sqs and dispatcher
    BufferQueue = make(chan sqs.ReceiveMessageResponse)

    // Now, create all of our workers.

    // Read off the SQS and place requests on the Buffer queue for each worker.

}

Since the work requests are of type sqs.ReceiveMessageResponse, so are our channels.

We read work requests off SQS and then publish them to a buffer queue from there on the dispatcher distributes it over the workers.

Worker pool

The worker pool is a nested channel structure. The inner channel holds work requests for each worker, the distribution for which is done by the Dispatcher. These channels are then stored in an outer channel, i.e. the channel of workers. Figure (2.) shows the structure of the worker pool.
alt
The workers read off their individual queues, process the work request and write the result to DynamoDB. The worker then sends a request to the Dequeuer to remove it from SQS. To recap the worker has to:

  1. read a work request of its channel,
  2. process it,
  3. write the result to DynamoDB,
  4. signal to the Dequeuer to dequeue the request, and
  5. repeat.

If their channel is empty then they block on the channel till a work request is received.

package main

import (  
    "fmt"
    "github.com/goamz/goamz/sqs"
)

type Worker struct {  
    ID         int                             // Worker ID
    Work_Queue chan sqs.ReceiveMessageResponse // Individual Queue for each worker where the dispatcher will put work. The work request will be of type ReceiveMessageResponse
}

func new_worker(id int) *Worker {  
    worker := &Worker{
        ID:         id,
        Work_Queue: make(chan sqs.ReceiveMessageResponse)}

    return worker
}


func (w Worker) worker_init(dequeuer *Dequeuer) {

    go func() {
        fmt.Println("Worker", w.ID)
        for {
            // Add ourselves into the worker queue.
            WorkerQueue <- w.Work_Queue
            select {
            case work_request := <-w.Work_Queue: // get work request

                go func() {
                    // Perform work

                    // Write result

                    // Dequeue request from SQS.
                    }
                }()
            }
        }
    }()
}

Dequeuer

Since the request has been successfully processed it is dequeued from SQS. The main reason for having a Dequeuer is prevent from the same work request to be processed multiple times.

Concluding

This article provides functioning code for the catcher and for the AWS components used in this implementation. The worker pool is described with an architecture and rough code skeletons.

The performance for this implementation was around 300 requests a second on a very small EC2 which costs about 5$/month.

The details of each component will be discussed in the articles to come.