← snippets
GoGoConcurrencyChannels

Go Concurrency Patterns

Practical patterns for goroutines, channels, and the sync package that I reach for regularly.

August 14, 2024

Fan-Out / Fan-In

Distribute work across N workers and collect results:

func fanOut[T, R any](ctx context.Context, inputs []T, workers int, fn func(T) R) []R {
    jobs := make(chan T, len(inputs))
    results := make(chan R, len(inputs))
 
    // Spawn workers
    var wg sync.WaitGroup
    for range workers {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                results <- fn(job)
            }
        }()
    }
 
    // Send work
    for _, input := range inputs {
        jobs <- input
    }
    close(jobs)
 
    // Wait then close results
    go func() {
        wg.Wait()
        close(results)
    }()
 
    // Collect
    out := make([]R, 0, len(inputs))
    for r := range results {
        out = append(out, r)
    }
    return out
}

Context Cancellation

Always propagate context; never ignore it in long-running operations:

func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
    results := make([][]byte, len(urls))
    eg, ctx := errgroup.WithContext(ctx)
 
    for i, url := range urls {
        i, url := i, url // capture loop vars
        eg.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err
            }
            defer resp.Body.Close()
            results[i], err = io.ReadAll(resp.Body)
            return err
        })
    }
 
    return results, eg.Wait()
}

Once + Lazy Initialization

For expensive, one-time initialization:

type DB struct {
    once sync.Once
    conn *sql.DB
    err  error
}
 
func (d *DB) Get() (*sql.DB, error) {
    d.once.Do(func() {
        d.conn, d.err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
    })
    return d.conn, d.err
}

Select with Timeout

func withTimeout[T any](ch <-chan T, timeout time.Duration) (T, bool) {
    select {
    case v := <-ch:
        return v, true
    case <-time.After(timeout):
        var zero T
        return zero, false
    }
}