mr 文档

概述

mr 包提供了一个在 Go 语言中执行 MapReduce 操作的框架。它支持并发执行映射和归约函数,并且可以自定义设置。

错误

  1. var (
  2. ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
  3. ErrReduceNoOutput = errors.New("reduce not writing value")
  4. )
  • ErrCancelWithNil:表示 MapReduce 操作被取消并且未返回错误。
  • ErrReduceNoOutput:表示归约函数没有输出任何值。

类型

ForEachFunc

  1. type ForEachFunc[T any] func(item T)

用于处理每个元素但没有输出的函数类型。

GenerateFunc

  1. type GenerateFunc[T any] func(source chan<- T)

用于生成要处理的元素的函数类型。

MapFunc

  1. type MapFunc[T, U any] func(item T, writer Writer[U])

用于处理元素并通过 writer 写出结果的函数类型。

MapperFunc

  1. type MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))

用于处理元素并支持取消功能的函数类型。

ReducerFunc

  1. type ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))

用于将映射阶段的输出元素归约为最终结果的函数类型。

VoidReducerFunc

  1. type VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))

用于归约输出元素但不产生最终结果的函数类型。

Option

  1. type Option func(opts *mapReduceOptions)

自定义 MapReduce 选项的方法。

Writer

  1. type Writer[T any] interface {
  2. Write(v T)
  3. }

封装写入方法的接口。

函数

Finish

  1. func Finish(fns ...func() error) error

并行运行函数,如果有任何错误则取消。

FinishVoid

  1. func FinishVoid(fns ...func())

并行运行函数,不产生输出。

ForEach

  1. func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option)

映射所有由 generate 函数生成的元素,但不产生输出。

MapReduce

  1. func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
  2. opts ...Option) (V, error)

使用给定的 generate 函数、mapper 和 reducer 执行 MapReduce 操作。

MapReduceChan

  1. func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
  2. opts ...Option) (V, error)

使用给定的源 channel、mapper 和 reducer 执行 MapReduce 操作。

MapReduceVoid

  1. func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
  2. reducer VoidReducerFunc[U], opts ...Option) error

使用给定的 generate 函数和 mapper 执行 MapReduce 操作,但不产生最终结果。

WithContext

  1. func WithContext(ctx context.Context) Option

自定义 MapReduce 操作以使用给定的上下文(context)。

WithWorkers

  1. func WithWorkers(workers int) Option

自定义 MapReduce 操作以使用指定数量的 worker。

下面是一些使用 mr 包的示例代码来演示各种功能:

示例1:处理每个元素(ForEach)

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-zero/core/mr"
  5. )
  6. func main() {
  7. generateFunc := func(source chan<- int) {
  8. for i := 0; i < 10; i++ {
  9. source <- i
  10. }
  11. }
  12. mapperFunc := func(item int) {
  13. fmt.Println("Processing item:", item)
  14. }
  15. mr.ForEach(generateFunc, mapperFunc, mr.WithWorkers(4))
  16. }

示例2:简单的 MapReduce 操作

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-zero/core/mr"
  5. )
  6. func main() {
  7. generateFunc := func(source chan<- int) {
  8. for i := 0; i < 10; i++ {
  9. source <- i
  10. }
  11. }
  12. mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
  13. writer.Write(item * 2)
  14. }
  15. reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
  16. sum := 0
  17. for v := range pipe {
  18. sum += v
  19. }
  20. writer.Write(sum)
  21. }
  22. result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithWorkers(4))
  23. if err != nil {
  24. fmt.Println("Error:", err)
  25. } else {
  26. fmt.Println("Result:", result) // Output: Result: 90
  27. }
  28. }

示例3:带有取消功能的 MapReduce 操作

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/zeromicro/go-zero/core/mr"
  7. )
  8. func main() {
  9. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  10. defer cancel()
  11. generateFunc := func(source chan<- int) {
  12. for i := 0; i < 100; i++ {
  13. source <- i
  14. time.Sleep(100 * time.Millisecond)
  15. }
  16. }
  17. mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) {
  18. writer.Write(item * 2)
  19. }
  20. reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
  21. sum := 0
  22. for v := range pipe {
  23. sum += v
  24. }
  25. writer.Write(sum)
  26. }
  27. result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithContext(ctx), mr.WithWorkers(4))
  28. if err != nil {
  29. fmt.Println("Error:", err) // Expected to timeout
  30. } else {
  31. fmt.Println("Result:", result)
  32. }
  33. }

示例4:并行执行多个函数(Finish 和 FinishVoid)

  1. package main
  2. import (
  3. "fmt"
  4. "errors"
  5. "github.com/zeromicro/go-zero/core/mr"
  6. )
  7. func main() {
  8. funcs := []func() error{
  9. func() error {
  10. fmt.Println("Function 1 executed")
  11. return nil
  12. },
  13. func() error {
  14. fmt.Println("Function 2 executed")
  15. return errors.New("error in function 2")
  16. },
  17. }
  18. err := mr.Finish(funcs...)
  19. if err != nil {
  20. fmt.Println("Finish encountered an error:", err)
  21. }
  22. voidFuncs := []func(){
  23. func() {
  24. fmt.Println("Void Function 1 executed")
  25. },
  26. func() {
  27. fmt.Println("Void Function 2 executed")
  28. },
  29. }
  30. mr.FinishVoid(voidFuncs...)
  31. }

这些示例展示了 go-zeromr 包的不同用法,包括基础的元素处理,基本的 MapReduce 操作,带有取消功能的 MapReduce 操作,以及并行执行多个函数。根据你的具体需求来选择和修改这些示例代码。