mr Package Documentation
Overview
The mr
package provides a framework for performing map-reduce operations in Go. It supports concurrent execution of mapping and reducing functions with customizable settings.
Errors
var (
ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
ErrReduceNoOutput = errors.New("reduce not writing value")
)
- ErrCancelWithNil: Error indicating that the map-reduce operation was canceled with nil.
- ErrReduceNoOutput: Error indicating that the reduce function did not produce any output.
Types
ForEachFunc
type ForEachFunc[T any] func(item T)
Function type for processing each element without output.
GenerateFunc
type GenerateFunc[T any] func(source chan<- T)
Function type for generating elements to be processed.
MapFunc
type MapFunc[T, U any] func(item T, writer Writer[U])
Function type for processing an element and writing the output using a writer.
MapperFunc
type MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))
Function type for processing an element with support for cancellation.
ReducerFunc
type ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))
Function type for reducing output elements from the mapping stage into a final result.
VoidReducerFunc
type VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))
Function type for reducing output elements without producing a final result.
Option
type Option func(opts *mapReduceOptions)
Function type for customizing map-reduce options.
Writer
type Writer[T any] interface {
Write(v T)
}
Interface for writing values.
Functions
Finish
func Finish(fns ...func() error) error
Runs functions in parallel and cancels on any error.
FinishVoid
func FinishVoid(fns ...func())
Runs functions in parallel without output.
ForEach
func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option)
Maps all elements from the generate function but produces no output.
MapReduce
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
opts ...Option) (V, error)
Performs map-reduce operation using the provided generate function, mapper, and reducer.
MapReduceChan
func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
opts ...Option) (V, error)
Performs map-reduce operation using the provided source channel, mapper, and reducer.
MapReduceVoid
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
reducer VoidReducerFunc[U], opts ...Option) error
Performs map-reduce operation using the provided generate function and mapper, but produces no final result.
WithContext
func WithContext(ctx context.Context) Option
Customizes a map-reduce operation to use a given context.
WithWorkers
func WithWorkers(workers int) Option
Customizes a map-reduce operation to use a specified number of workers.
Below are some examples demonstrating various functionalities of the mr package:
Example 1: Processing Each Element (ForEach)
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
generateFunc := func(source chan<- int) {
for i := 0; i < 10; i++ {
source <- i
}
}
mapperFunc := func(item int) {
fmt.Println("Processing item:", item)
}
mr.ForEach(generateFunc, mapperFunc, mr.WithWorkers(4))
}
Example 2: Simple MapReduce Operation
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
generateFunc := func(source chan<- int) {
for i := 0; i < 10; i++ {
source <- i
}
}
mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
writer.Write(item * 2)
}
reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
sum := 0
for v := range pipe {
sum += v
}
writer.Write(sum)
}
result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithWorkers(4))
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Result:", result) // Output: Result: 90
}
}
Example 3: MapReduce Operation with Cancellation
package main
import (
"context"
"fmt"
"time"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
generateFunc := func(source chan<- int) {
for i := 0; i < 100; i++ {
source <- i
time.Sleep(100 * time.Millisecond)
}
}
mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
writer.Write(item * 2)
}
reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
sum := 0
for v := range pipe {
sum += v
}
writer.Write(sum)
}
result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithContext(ctx), mr.WithWorkers(4))
if err != nil {
fmt.Println("Error:", err) // Expected to timeout
} else {
fmt.Println("Result:", result)
}
}
Example 4: Parallel Execution of Multiple Functions (Finish and FinishVoid)
package main
import (
"fmt"
"errors"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
funcs := []func() error{
func() error {
fmt.Println("Function 1 executed")
return nil
},
func() error {
fmt.Println("Function 2 executed")
return errors.New("error in function 2")
},
}
err := mr.Finish(funcs...)
if err != nil {
fmt.Println("Finish encountered an error:", err)
}
voidFuncs := []func(){
func() {
fmt.Println("Void Function 1 executed")
},
func() {
fmt.Println("Void Function 2 executed")
},
}
mr.FinishVoid(voidFuncs...)
}
These examples showcase different usages of the mr
package from go-zero
, including basic element processing, simple MapReduce operations, MapReduce operations with cancellation, and parallel execution of multiple functions. Choose and modify these examples according to your specific requirements.