fx
Package Documentation
The fx
package provides a powerful and flexible API for stream processing. It allows users to perform various operations on streams, including filtering, mapping, reducing, grouping, and more.
Function Types
FilterFunc
Defines the method to filter a Stream.
type FilterFunc func(item any) bool
ForAllFunc
Defines the method to handle all elements in a Stream.
type ForAllFunc func(pipe <-chan any)
ForEachFunc
Defines the method to handle each element in a Stream.
type ForEachFunc func(item any)
GenerateFunc
Defines the method to send elements into a Stream.
type GenerateFunc func(source chan<- any)
KeyFunc
Defines the method to generate keys for the elements in a Stream.
type KeyFunc func(item any) any
LessFunc
Defines the method to compare the elements in a Stream.
type LessFunc func(a, b any) bool
MapFunc
Defines the method to map each element to another object in a Stream.
type MapFunc func(item any) any
Option
Defines the method to customize a Stream.
type Option func(opts *rxOptions)
ParallelFunc
Defines the method to handle elements parallelly.
type ParallelFunc func(item any)
ReduceFunc
Defines the method to reduce all the elements in a Stream.
type ReduceFunc func(pipe <-chan any) (any, error)
WalkFunc
Defines the method to walk through all the elements in a Stream.
type WalkFunc func(item any, pipe chan<- any)
Stream
A Stream is a stream that can be used to do stream processing.
type Stream struct {
source <-chan any
}
Functions
Concat
Returns a concatenated Stream.
func Concat(s Stream, others ...Stream) Stream
From
Constructs a Stream from the given GenerateFunc
.
func From(generate GenerateFunc) Stream
Just
Converts the given arbitrary items to a Stream.
func Just(items ...any) Stream
Range
Converts the given channel to a Stream.
func Range(source <-chan any) Stream
Stream Methods
AllMatch
Returns whether all elements of this stream match the provided predicate.
func (s Stream) AllMatch(predicate func(item any) bool) bool
AnyMatch
Returns whether any elements of this stream match the provided predicate.
func (s Stream) AnyMatch(predicate func(item any) bool) bool
Buffer
Buffers the items into a queue with size n
.
func (s Stream) Buffer(n int) Stream
Concat
Returns a Stream that concatenated other streams.
func (s Stream) Concat(others ...Stream) Stream
Count
Counts the number of elements in the result.
func (s Stream) Count() (count int)
Distinct
Removes the duplicated items based on the given KeyFunc
.
func (s Stream) Distinct(fn KeyFunc) Stream
Done
Waits all upstreaming operations to be done.
func (s Stream) Done()
Filter
Filters the items by the given FilterFunc
.
func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream
First
Returns the first item, nil if no items.
func (s Stream) First() any
ForAll
Handles the streaming elements from the source and no later streams.
func (s Stream) ForAll(fn ForAllFunc)
ForEach
Seals the Stream with the ForEachFunc
on each item, no successive operations.
func (s Stream) ForEach(fn ForEachFunc)
Group
Groups the elements into different groups based on their keys.
func (s Stream) Group(fn KeyFunc) Stream
Head
Returns the first n
elements in p.
func (s Stream) Head(n int64) Stream
Last
Returns the last item, or nil if no items.
func (s Stream) Last() (item any)
Map
Converts each item to another corresponding item, which means it’s a 1:1 model.
func (s Stream) Map(fn MapFunc, opts ...Option) Stream
Max
Returns the maximum item from the underlying source.
func (s Stream) Max(less LessFunc) any
Merge
Merges all the items into a slice and generates a new stream.
func (s Stream) Merge() Stream
Min
Returns the minimum item from the underlying source.
func (s Stream) Min(less LessFunc) any
NoneMatch
Returns whether all elements of this stream don’t match the provided predicate.
func (s Stream) NoneMatch(predicate func(item any) bool) bool
Parallel
Applies the given ParallelFunc
to each item concurrently with given number of workers.
func (s Stream) Parallel(fn ParallelFunc, opts ...Option)
Reduce
Is a utility method to let the caller deal with the underlying channel.
func (s Stream) Reduce(fn ReduceFunc) (any, error)
Reverse
Reverses the elements in the stream.
func (s Stream) Reverse() Stream
Skip
Returns a Stream that skips n
elements.
func (s Stream) Skip(n int64) Stream
Sort
Sorts the items from the underlying source.
func (s Stream) Sort(less LessFunc) Stream
Split
Splits the elements into chunks with size up to n
.
func (s Stream) Split(n int) Stream
Tail
Returns the last n
elements in p.
func (s Stream) Tail(n int64) Stream
Walk
Lets the callers handle each item. The caller may write zero, one, or more items based on the given item.
func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream
Options
UnlimitedWorkers
Lets the caller use as many workers as the tasks.
func UnlimitedWorkers() Option
WithWorkers
Lets the caller customize the concurrent workers.
func WithWorkers(workers int) Option
Examples
Here are some examples demonstrating how to use the fx
package for stream processing operations.
Example 1: Creating a Stream from an Array and Filtering Elements
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
stream := fx.Just(items...).
Filter(func(item any) bool {
return item.(int)%2 == 0
})
for item := range stream.source {
fmt.Println(item)
}
}
Explanation:
- Use the
Just
method to convert an array to a stream. - Use the
Filter
method to filter out even numbers. - Iterate over the stream and print the results.
Example 2: Processing Stream Elements in Parallel
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
fx.Just(items...).
Parallel(func(item any) {
fmt.Printf("Processing %v\n", item)
}, fx.WithWorkers(3))
}
Explanation:
- Use the
Just
method to convert an array to a stream. - Use the
Parallel
method to process each element in parallel, specifying 3 workers. - Print each element being processed.
Example 3: Sorting Elements in a Stream
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{4, 2, 5, 1, 3}
stream := fx.Just(items...).
Sort(func(a, b any) bool {
return a.(int) < b.(int)
})
for item := range stream.source {
fmt.Println(item)
}
}
Explanation:
- Use the
Just
method to convert an array to a stream. - Use the
Sort
method to sort the elements. - Iterate over the stream and print the sorted results.
Example 4: Reducing Elements in a Stream
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
result, _ := fx.Just(items...).
Reduce(func(pipe <-chan any) (any, error) {
sum := 0
for item := range pipe {
sum += item.(int)
}
return sum, nil
})
fmt.Println(result) // Output: 15
}
Explanation:
- Use the
Just
method to convert an array to a stream. - Use the
Reduce
method to calculate the sum of all elements in the stream. - Print the reduction result.
Example 5: Grouping Elements in a Stream
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{"apple", "banana", "avocado", "blueberry"}
stream := fx.Just(items...).
Group(func(item any) any {
return item.(string)[0] // Group by the first letter
})
for group := range stream.source {
fmt.Println(group)
}
}
Explanation:
- Use the
Just
method to convert an array to a stream. - Use the
Group
method to group elements by their first letter. - Iterate over the stream and print each group.
These examples demonstrate how the fx
package can simplify data processing through stream operations. You can combine and apply these methods according to your specific requirements in your code.