# flowmatic **Repository Path**: wb97/flowmatic ## Basic Information - **Project Name**: flowmatic - **Description**: golang All Race - **Primary Language**: Go - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-02 - **Last Updated**: 2026-02-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flowmatic [](https://pkg.go.dev/github.com/carlmjohnson/flowmatic) [](https://coveralls.io/github/carlmjohnson/flowmatic) [](https://goreportcard.com/report/github.com/carlmjohnson/flowmatic) [](https://github.com/avelino/awesome-go)  Flowmatic is a generic Go library that provides a [structured approach](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) to concurrent programming. It lets you easily manage concurrent tasks in a manner that is simple, yet effective and flexible. Flowmatic has an easy to use API with functions for handling common concurrency patterns. It automatically handles spawning workers, collecting errors, and propagating panics. Flowmatic requires Go 1.20+. ## Features - Has a simple API that improves readability over channels/waitgroups/mutexes - Handles a variety of concurrency problems such as heterogenous task groups, homogenous execution of a task over a slice, and dynamic work spawning - Aggregates errors - Properly propagates panics across goroutine boundaries - Has helpers for context cancelation - Few dependencies - Good test coverage ## How to use Flowmatic ### Execute heterogenous tasks One problem that Flowmatic solves is managing the execution of multiple tasks in parallel that are independent of each other. For example, let's say you want to send data to three different downstream APIs. If any of the sends fail, you want to return an error. With traditional Go concurrency, this can quickly become complex and difficult to manage, with Goroutines, channels, and `sync.WaitGroup`s to keep track of. Flowmatic makes it simple. To execute heterogenous tasks, just use `flowmatic.Do`:
flowmatic |
stdlib |
|---|---|
| ```go err := flowmatic.Do( func() error { return doThingA(), }, func() error { return doThingB(), }, func() error { return doThingC(), }) ``` | ```go var wg sync.WaitGroup var errs []error errChan := make(chan error) wg.Add(3) go func() { defer wg.Done() if err := doThingA(); err != nil { errChan <- err } }() go func() { defer wg.Done() if err := doThingB(); err != nil { errChan <- err } }() go func() { defer wg.Done() if err := doThingC(); err != nil { errChan <- err } }() go func() { wg.Wait() close(errChan) }() for err := range errChan { errs = append(errs, err) } err := errors.Join(errs...) ``` |
flowmatic |
stdlib |
|---|---|
| ```go things := []someType{thingA, thingB, thingC} err := flowmatic.Each(numWorkers, things, func(thing someType) error { foo := thing.Frobincate() return foo.DoSomething() }) ``` | ```go things := []someType{thingA, thingB, thingC} work := make(chan someType) errs := make(chan error) for i := 0; i < numWorkers; i++ { go func() { for thing := range work { // Omitted: panic handling! foo := thing.Frobincate() errs <- foo.DoSomething() } }() } go func() { for _, thing := range things { work <- thing } close(tasks) }() var collectedErrs []error for i := 0; i < len(things); i++ { collectedErrs = append(collectedErrs, <-errs) } err := errors.Join(collectedErrs...) ``` |
| ```go func main() { results, err := Google(context.Background(), "golang") if err != nil { fmt.Fprintln(os.Stderr, err) return } for _, result := range results { fmt.Println(result) } } ``` | |
flowmatic |
x/sync/errgroup |
|---|---|
| ```go func Google(ctx context.Context, query string) ([]Result, error) { searches := []Search{Web, Image, Video} return flowmatic.Map(ctx, flowmatic.MaxProcs, searches, func(ctx context.Context, search Search) (Result, error) { return search(ctx, query) }) } ``` | ```go func Google(ctx context.Context, query string) ([]Result, error) { g, ctx := errgroup.WithContext(ctx) searches := []Search{Web, Image, Video} results := make([]Result, len(searches)) for i, search := range searches { i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { result, err := search(ctx, query) if err == nil { results[i] = result } return err }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } ``` |
| ```go func main() { m, err := MD5All(context.Background(), ".") if err != nil { log.Fatal(err) } for k, sum := range m { fmt.Printf("%s:\t%x\n", k, sum) } } ``` | |
flowmatic |
x/sync/errgroup |
|---|---|
| ```go // MD5All reads all the files in the file tree rooted at root // and returns a map from file path to the MD5 sum of the file's contents. // If the directory walk fails or any read operation fails, // MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // Make a pool of 20 digesters in, out := flowmatic.TaskPool(20, digest) m := make(map[string][md5.Size]byte) // Open two goroutines: // one for reading file names by walking the filesystem // one for recording results from the digesters in a map err := flowmatic.All(ctx, func(ctx context.Context) error { return walkFilesystem(ctx, root, in) }, func(ctx context.Context) error { for r := range out { if r.Err != nil { return r.Err } m[r.In] = *r.Out } return nil }, ) return m, err } func walkFilesystem(ctx context.Context, root string, in chan<- string) error { defer close(in) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case in <- path: case <-ctx.Done(): return ctx.Err() } return nil }) } func digest(path string) (*[md5.Size]byte, error) { data, err := os.ReadFile(path) if err != nil { return nil, err } hash := md5.Sum(data) return &hash, nil } ``` | ```go type result struct { path string sum [md5.Size]byte } // MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // ctx is canceled when g.Wait() returns. When this version of MD5All returns // - even in case of error! - we know that all of the goroutines have finished // and the memory they were using can be garbage-collected. g, ctx := errgroup.WithContext(ctx) paths := make(chan string) g.Go(func() error { defer close(paths) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-ctx.Done(): return ctx.Err() } return nil }) }) // Start a fixed number of goroutines to read and digest files. c := make(chan result) const numDigesters = 20 for i := 0; i < numDigesters; i++ { g.Go(func() error { for path := range paths { data, err := ioutil.ReadFile(path) if err != nil { return err } select { case c <- result{path, md5.Sum(data)}: case <-ctx.Done(): return ctx.Err() } } return nil }) } go func() { g.Wait() close(c) }() m := make(map[string][md5.Size]byte) for r := range c { m[r.path] = r.sum } // Check whether any of the goroutines failed. Since g is accumulating the // errors, we don't need to send them (or check for them) in the individual // results sent on the channel. if err := g.Wait(); err != nil { return nil, err } return m, nil } ``` |