Posted on Leave a comment

Gracefully shutting down multiple workers in Go

Managing multiple workers in Go

In this post, we’ll see how to write workers in Go, how to gracefully shut them down. We’ll also look at how to coordinate multiple workers.

Finally, I’ll introduce¬†Orchestra, a package I built which makes it a lot easier to manage and gracefully shutdown workers.

What are workers?

When we write programs, we are often in situations when we have function that runs until either it faces an error, or we explicitly order it to stop. A good example of this is the http.Server. When we start a web server in Go, the program will block until we ask it to stop or it encounters.

Here’s a simple webserver:

package main

import (
    "io"
    "net/http"
)

func main() {
    // Hello world, the web server

    helloHandler := func(w http.ResponseWriter, req *http.Request) {
        io.WriteString(w, "Hello, world!\n")
    }

    http.HandleFunc("/hello", helloHandler)
    err := http.ListenAndServe(":8080", nil)
    if err != nil {
        panic(err)
    }
}

We can write a function that behaves the same way:

package main

func main()
    err := myWorker()
    if err != nil {
        panic(err)
    }
}

func myWorker() error {
    for {
        err := doSomethingRepeatedly()
        if err != nil {
            return err
        }
    }

    return nil
}

In both these programs, we can ask them to stop by killing the main process (ctrl + C) and that will forcefully close the program.

Gracefully Shutting Down Workers

To shutdown a worker grace fully we should listen for signals passed to the program. The main ones to look out for are:

  • SIGINT: The interrupt signal. The terminal sends it to the foreground process when the user presses ctrl-c. The default behavior is to terminate the process, but it can be caught or ignored. The intention is to provide a mechanism for an orderly, graceful shutdown.
  • SIGTERM: The termination signal. The default behavior is to terminate the process, but it also can be caught or ignored. The intention is to kill the process, gracefully or not, but to first allow it a chance to cleanup.

Credit for the explanation of what the signals mean

For the webserver, since Go 1.8 (which was a long time ago), the http.Server type has a Shutdown method which gracefully closes the connections.

Here’s an example of how to use it:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    var srv http.Server

    idleConnsClosed := make(chan struct{})
    go func() {
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
        <-signals

        // We received an interrupt signal, shut down.
        if err := srv.Shutdown(context.Background()); err != nil {
            // Error from closing listeners, or context timeout:
            log.Printf("HTTP server Shutdown: %v", err)
        }
        close(idleConnsClosed)
    }()

    if err := srv.ListenAndServe(); err != http.ErrServerClosed {
        // Error starting or closing listener:
        log.Fatalf("HTTP server ListenAndServe: %v", err)
    }

    <-idleConnsClosed
}

Let’s create something similar for our worker:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
)

var shouldStop = false

func main() {
    complete := make(chan struct{})

    go func() {
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
        <-signals

        shouldStop = true

        close(complete)
    }()

    if err := myWorker(); err != nil {
        // Error in our worker:
        log.Fatalf("Worker error: %v", err)
    }

    <-complete
}

func myWorker() error {
    for !shouldStop {
        err := doSomethingRepeatedly()
        if err != nil {
            return err
        }
    }

    return nil
}

Multiple Workers

This seems somewhat straightforward with just a single long running process. However, it becomes trickier.

At first glance, it seems we can just do something like this:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
)

var shouldStop = false

func main() {
    var srv http.Server

    complete := make(chan struct{})

    go func() {
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
        <-signals

        // Ask our worker to stop
        shouldStop = true

        // Shutdown the server
        if err := srv.Shutdown(context.Background()); err != nil {
            // Error from closing listeners, or context timeout:
            log.Printf("HTTP server Shutdown: %v", err)
        }

        close(complete)
    }()

    go func() {
        if err := myWorker(); err != nil {
            // Error in our worker:
            log.Fatalf("Worker error: %v", err)
        }
    }()

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            // Error starting or closing listener:
            log.Fatalf("HTTP server ListenAndServe: %v", err)
        }
    }()

    <-complete
}

func myWorker() error {
    for !shouldStop {
        err := doSomethingRepeatedly()
        if err != nil {
            return err
        }
    }

    return nil
}

But that does not work as expected. Here’s why:

First, we did not stop the workers asynchronously. It does not seem really needed in the particular example, so let’s leave that problem for now.

More importantly, in the example above, if any of the workers encounters an error, the program rudely exits! Our entire shutdown sequence is ignored.

Coordinating with Context

In idiomatic Go, the ideal way would be to pass a context.Context to our worker, and ask it to shut itself down once the context is done.

Using this method, instead of calling log.Fatalf() when a worker encounters an error, we can simply cancel the context, and it will then initiate the shutdown sequence.

Since we are not calling the shutdown functions directly, let’s use sync.WaitGroup to know when all our workers have exited.

This means we’d have to rework our code to look like this:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    var wg sync.WaitGroup

    go func() {
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
        <-signals

        cancel()
    }()

    wg.Add(1)
    go func() {
        if err := myWorker(ctx); err != nil {
            cancel()
        }
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        if err := startServer(ctx); err != nil {
            cancel()
        }
        wg.Done()
    }()

    wg.Wait()
}

func myWorker(ctx context.Context) error {

    var shouldStop = false

    go func() {
        <-ctx.Done()
        shouldStop = true
    }()

    for !shouldStop {
        err := doSomethingRepeatedly()
        if err != nil {
            return err
        }
    }

    return nil
}

func startServer(ctx context.Context) error {

    var srv http.Server

    go func() {
        <-ctx.Done() // Wait for the context to be done

        // Shutdown the server
        if err := srv.Shutdown(context.Background()); err != nil {
            // Error from closing listeners, or context timeout:
            log.Printf("HTTP server Shutdown: %v", err)
        }
    }()

    if err := srv.ListenAndServe(); err != http.ErrServerClosed {
        // Error starting or closing listener:
        return fmt.Errorf("HTTP server ListenAndServe: %w", err)
    }

    return nil
}

This neatly satisfies all our conditions. Yay!

Using Orchestra

To reduce all this boilerplate, I created a package to do most of this.

Orchestra is a library to manage long running go processes.

At the heart of the library is an interface called Player

// Player is a long running background worker
type Player interface {
    Play(context.Context) error
}

All a type needs to do to satisfy the interface is to have a Play method that will gracefully shutdown when the context is done.

It can also return an error if it encounters a problem when playing.

Next, there’s the Conductor type (which itself is a Player)

// Conductor is a group of workers. It is also a Player itself **evil laugh**
type Conductor struct {
    Timeout time.Duration
    Players map[string]Player
}

With the conductor, you add Players to it, and when you call the Play method on the conductor, it will start the Players under it and gracefully shut them all down when the main context is done.

The timeout is there incase there is a Player that refused to stop.

It also contains a few helper functions:

  • PlayUntilSignal(p Player, sig ...os.Signal): This will start a player with a context, and close the context once it receives any of the signals provided.
  • PlayerFunc(func(context.Context) error): This is a quick way to convert a standalone function into a type that satisfies the Player interface.
  • ServerPlayer{*http.Server}: This is a type that embeds the *http.Server and extends it to satisfy the Player interface.

More details can be found in the README.

With these, we can simplify our example code into this:

package main

import (
    "context"
    "net/http"
    "os"
    "syscall"
    "time"

    "github.com/stephenafamo/orchestra"
)

func main() {
    var srv = &http.Server{}

    // creates a player from a myWorker function
    workerPlayer := orchestra.PlayerFunc(myWorker)
    // A player from a server
    serverPlayer := orchestra.ServerPlayer{srv}

    // A conductor to control them all
    conductor := &orchestra.Conductor{
        Timeout: 5 * time.Second,
        Players: map[string]orchestra.Player{
            // the names are used to identify the players
            // both in logs and the returned errors
            "worker": workerPlayer,
            "server":   serverPlayer,
        },
    }

    // Use the conductor as a Player
    err := orchestra.PlayUntilSignal(conductor, os.Interrupt, syscall.SIGTERM)
    if err != nil {
        panic(err)
    }
}

func myWorker(ctx context.Context) error {

    var shouldStop = false

    go func() {
        <-ctx.Done()
        shouldStop = true
    }()

    for !shouldStop {
        err := doSomethingRepeatedly()
        if err != nil {
            return err
        }
    }

    return nil
}

Conclusion

As stated in the first paragraph, we’ve seen how to write workers in Go, how to gracefully shut them down and I’ve introduced Orchestra, a package I built which makes it a lot easier to manage and gracefully shutdown workers.

Spread the love
Posted on Leave a comment
Leave a Reply

Your email address will not be published. Required fields are marked *