Fan-Out / Fan-In
Distribute work across N workers and collect results:
func fanOut[T, R any](ctx context.Context, inputs []T, workers int, fn func(T) R) []R {
jobs := make(chan T, len(inputs))
results := make(chan R, len(inputs))
// Spawn workers
var wg sync.WaitGroup
for range workers {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- fn(job)
}
}()
}
// Send work
for _, input := range inputs {
jobs <- input
}
close(jobs)
// Wait then close results
go func() {
wg.Wait()
close(results)
}()
// Collect
out := make([]R, 0, len(inputs))
for r := range results {
out = append(out, r)
}
return out
}Context Cancellation
Always propagate context; never ignore it in long-running operations:
func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
results := make([][]byte, len(urls))
eg, ctx := errgroup.WithContext(ctx)
for i, url := range urls {
i, url := i, url // capture loop vars
eg.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
results[i], err = io.ReadAll(resp.Body)
return err
})
}
return results, eg.Wait()
}Once + Lazy Initialization
For expensive, one-time initialization:
type DB struct {
once sync.Once
conn *sql.DB
err error
}
func (d *DB) Get() (*sql.DB, error) {
d.once.Do(func() {
d.conn, d.err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
})
return d.conn, d.err
}Select with Timeout
func withTimeout[T any](ch <-chan T, timeout time.Duration) (T, bool) {
select {
case v := <-ch:
return v, true
case <-time.After(timeout):
var zero T
return zero, false
}
}