mr Package Documentation

Overview

The mr package provides a framework for performing map-reduce operations in Go. It supports concurrent execution of mapping and reducing functions with customizable settings.

Errors

  1. var (
  2. ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
  3. ErrReduceNoOutput = errors.New("reduce not writing value")
  4. )
  • ErrCancelWithNil: Error indicating that the map-reduce operation was canceled with nil.
  • ErrReduceNoOutput: Error indicating that the reduce function did not produce any output.

Types

ForEachFunc

  1. type ForEachFunc[T any] func(item T)

Function type for processing each element without output.

GenerateFunc

  1. type GenerateFunc[T any] func(source chan<- T)

Function type for generating elements to be processed.

MapFunc

  1. type MapFunc[T, U any] func(item T, writer Writer[U])

Function type for processing an element and writing the output using a writer.

MapperFunc

  1. type MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))

Function type for processing an element with support for cancellation.

ReducerFunc

  1. type ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))

Function type for reducing output elements from the mapping stage into a final result.

VoidReducerFunc

  1. type VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))

Function type for reducing output elements without producing a final result.

Option

  1. type Option func(opts *mapReduceOptions)

Function type for customizing map-reduce options.

Writer

  1. type Writer[T any] interface {
  2. Write(v T)
  3. }

Interface for writing values.

Functions

Finish

  1. func Finish(fns ...func() error) error

Runs functions in parallel and cancels on any error.

FinishVoid

  1. func FinishVoid(fns ...func())

Runs functions in parallel without output.

ForEach

  1. func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option)

Maps all elements from the generate function but produces no output.

MapReduce

  1. func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
  2. opts ...Option) (V, error)

Performs map-reduce operation using the provided generate function, mapper, and reducer.

MapReduceChan

  1. func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
  2. opts ...Option) (V, error)

Performs map-reduce operation using the provided source channel, mapper, and reducer.

MapReduceVoid

  1. func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
  2. reducer VoidReducerFunc[U], opts ...Option) error

Performs map-reduce operation using the provided generate function and mapper, but produces no final result.

WithContext

  1. func WithContext(ctx context.Context) Option

Customizes a map-reduce operation to use a given context.

WithWorkers

  1. func WithWorkers(workers int) Option

Customizes a map-reduce operation to use a specified number of workers.

Below are some examples demonstrating various functionalities of the mr package:

Example 1: Processing Each Element (ForEach)

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-zero/core/mr"
  5. )
  6. func main() {
  7. generateFunc := func(source chan<- int) {
  8. for i := 0; i < 10; i++ {
  9. source <- i
  10. }
  11. }
  12. mapperFunc := func(item int) {
  13. fmt.Println("Processing item:", item)
  14. }
  15. mr.ForEach(generateFunc, mapperFunc, mr.WithWorkers(4))
  16. }

Example 2: Simple MapReduce Operation

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-zero/core/mr"
  5. )
  6. func main() {
  7. generateFunc := func(source chan<- int) {
  8. for i := 0; i < 10; i++ {
  9. source <- i
  10. }
  11. }
  12. mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
  13. writer.Write(item * 2)
  14. }
  15. reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
  16. sum := 0
  17. for v := range pipe {
  18. sum += v
  19. }
  20. writer.Write(sum)
  21. }
  22. result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithWorkers(4))
  23. if err != nil {
  24. fmt.Println("Error:", err)
  25. } else {
  26. fmt.Println("Result:", result) // Output: Result: 90
  27. }
  28. }

Example 3: MapReduce Operation with Cancellation

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/zeromicro/go-zero/core/mr"
  7. )
  8. func main() {
  9. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  10. defer cancel()
  11. generateFunc := func(source chan<- int) {
  12. for i := 0; i < 100; i++ {
  13. source <- i
  14. time.Sleep(100 * time.Millisecond)
  15. }
  16. }
  17. mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
  18. writer.Write(item * 2)
  19. }
  20. reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
  21. sum := 0
  22. for v := range pipe {
  23. sum += v
  24. }
  25. writer.Write(sum)
  26. }
  27. result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithContext(ctx), mr.WithWorkers(4))
  28. if err != nil {
  29. fmt.Println("Error:", err) // Expected to timeout
  30. } else {
  31. fmt.Println("Result:", result)
  32. }
  33. }

Example 4: Parallel Execution of Multiple Functions (Finish and FinishVoid)

  1. package main
  2. import (
  3. "fmt"
  4. "errors"
  5. "github.com/zeromicro/go-zero/core/mr"
  6. )
  7. func main() {
  8. funcs := []func() error{
  9. func() error {
  10. fmt.Println("Function 1 executed")
  11. return nil
  12. },
  13. func() error {
  14. fmt.Println("Function 2 executed")
  15. return errors.New("error in function 2")
  16. },
  17. }
  18. err := mr.Finish(funcs...)
  19. if err != nil {
  20. fmt.Println("Finish encountered an error:", err)
  21. }
  22. voidFuncs := []func(){
  23. func() {
  24. fmt.Println("Void Function 1 executed")
  25. },
  26. func() {
  27. fmt.Println("Void Function 2 executed")
  28. },
  29. }
  30. mr.FinishVoid(voidFuncs...)
  31. }

These examples showcase different usages of the mr package from go-zero, including basic element processing, simple MapReduce operations, MapReduce operations with cancellation, and parallel execution of multiple functions. Choose and modify these examples according to your specific requirements.