Stream processing

[!TIP] This document is machine-translated by Google. If you find grammatical and semantic errors, and the document description is not clear, please PR

Stream processing is a computer programming paradigm that allows given a data sequence (stream processing data source), a series of data operations (functions) are applied to each element in the stream. At the same time, stream processing tools can significantly improve programmers’ development efficiency, allowing them to write effective, clean, and concise code.

Streaming data processing is very common in our daily work. For example, we often record many business logs in business development. These logs are usually sent to Kafka first, and then written to elasticsearch by the Job consumption Kafka, and the logs are in progress. In the process of stream processing, logs are often processed, such as filtering invalid logs, doing some calculations and recombining logs, etc. The schematic diagram is as follows: fx_log.png

fx

go-zero is a full-featured microservice framework. There are many very useful tools built in the framework, including streaming data processing tools fx , let’s use a simple example to understand the tool:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "os/signal"
  6. "syscall"
  7. "time"
  8. "github.com/zeromicro/go-zero/core/fx"
  9. )
  10. func main() {
  11. ch := make(chan int)
  12. go inputStream(ch)
  13. go outputStream(ch)
  14. c := make(chan os.Signal, 1)
  15. signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
  16. <-c
  17. }
  18. func inputStream(ch chan int) {
  19. count := 0
  20. for {
  21. ch <- count
  22. time.Sleep(time.Millisecond * 500)
  23. count++
  24. }
  25. }
  26. func outputStream(ch chan int) {
  27. fx.From(func(source chan<- interface{}) {
  28. for c := range ch {
  29. source <- c
  30. }
  31. }).Walk(func(item interface{}, pipe chan<- interface{}) {
  32. count := item.(int)
  33. pipe <- count
  34. }).Filter(func(item interface{}) bool {
  35. itemInt := item.(int)
  36. if itemInt%2 == 0 {
  37. return true
  38. }
  39. return false
  40. }).ForEach(func(item interface{}) {
  41. fmt.Println(item)
  42. })
  43. }

The inputStream function simulates the generation of stream data, and the outputStream function simulates the process of stream data. The From function is the input of the stream. The Walk function concurrently acts on each item. The Filter function filters the item as true and keeps it as false. Keep, the ForEach function traverses and outputs each item element.

Intermediate operations of streaming data processing

There may be many intermediate operations in the data processing of a stream, and each intermediate operation can act on the stream. Just like the workers on the assembly line, each worker will return to the processed new part after operating the part, and in the same way, after the intermediate operation of the flow processing is completed, it will also return to a new flow. 7715f4b6-8739-41ac-8c8c-04d187172e9d.png Intermediate operations of fx stream processing:

Operation function Features Input
Distinct Remove duplicate items KeyFunc, return the key that needs to be deduplicated
Filter Filter items that do not meet the conditions FilterFunc, Option controls the amount of concurrency
Group Group items KeyFunc, group by key
Head Take out the first n items and return to the new stream int64 reserved number
Map Object conversion MapFunc, Option controls the amount of concurrency
Merge Merge item into slice and generate new stream
Reverse Reverse item
Sort Sort items LessFunc implements sorting algorithm
Tail Similar to the Head function, n items form a new stream after being taken out int64 reserved number
Walk Act on each item WalkFunc, Option controls the amount of concurrency

The following figure shows each step and the result of each step:

3aefec98-56eb-45a6-a4b2-9adbdf4d63c0.png

Usage and principle analysis

From

Construct a stream through the From function and return the Stream, and the stream data is stored through the channel:

  1. // Example
  2. s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, v := range s {
  5. source <- v
  6. }
  7. })
  8. // Source Code
  9. func From(generate GenerateFunc) Stream {
  10. source := make(chan interface{})
  11. threading.GoSafe(func() {
  12. defer close(source)
  13. generate(source)
  14. })
  15. return Range(source)
  16. }

Filter

The Filter function provides the function of filtering items, FilterFunc defines the filtering logic true to retain the item, and false to not retain:

  1. // Example: Keep even numbers
  2. s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, v := range s {
  5. source <- v
  6. }
  7. }).Filter(func(item interface{}) bool {
  8. if item.(int)%2 == 0 {
  9. return true
  10. }
  11. return false
  12. })
  13. // Source Code
  14. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  15. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  16. // Execute the filter function true to retain, false to discard
  17. if fn(item) {
  18. pipe <- item
  19. }
  20. }, opts...)
  21. }

Group

Group groups the stream data. The key of the group needs to be defined. After the data is grouped, it is stored in the channel as slices:

  1. // Example Group according to the first character "g" or "p", if not, it will be divided into another group
  2. ss := []string{"golang", "google", "php", "python", "java", "c++"}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, s := range ss {
  5. source <- s
  6. }
  7. }).Group(func(item interface{}) interface{} {
  8. if strings.HasPrefix(item.(string), "g") {
  9. return "g"
  10. } else if strings.HasPrefix(item.(string), "p") {
  11. return "p"
  12. }
  13. return ""
  14. }).ForEach(func(item interface{}) {
  15. fmt.Println(item)
  16. })
  17. }
  18. // Source Code
  19. func (p Stream) Group(fn KeyFunc) Stream {
  20. // Define group storage map
  21. groups := make(map[interface{}][]interface{})
  22. for item := range p.source {
  23. // User-defined group key
  24. key := fn(item)
  25. // Group the same key into a group
  26. groups[key] = append(groups[key], item)
  27. }
  28. source := make(chan interface{})
  29. go func() {
  30. for _, group := range groups {
  31. // A group of data with the same key is written to the channel
  32. source <- group
  33. }
  34. close(source)
  35. }()
  36. return Range(source)
  37. }

Reverse

reverse can reverse the elements in the stream:

7e0fd2b8-d4c1-4130-a216-a7d3d4301116.png

  1. // Example
  2. fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  3. fmt.Println(item)
  4. })
  5. // Source Code
  6. func (p Stream) Reverse() Stream {
  7. var items []interface{}
  8. // Get the data in the stream
  9. for item := range p.source {
  10. items = append(items, item)
  11. }
  12. // Reversal algorithm
  13. for i := len(items)/2 - 1; i >= 0; i-- {
  14. opp := len(items) - 1 - i
  15. items[i], items[opp] = items[opp], items[i]
  16. }
  17. // Write stream
  18. return Just(items...)
  19. }

Distinct

Distinct de-duplicates elements in the stream. De-duplication is commonly used in business development. It is often necessary to de-duplicate user IDs, etc.:

  1. // Example
  2. fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  3. return item
  4. }).ForEach(func(item interface{}) {
  5. fmt.Println(item)
  6. })
  7. // Output: 1,2,3,4,5,6
  8. // Source Code
  9. func (p Stream) Distinct(fn KeyFunc) Stream {
  10. source := make(chan interface{})
  11. threading.GoSafe(func() {
  12. defer close(source)
  13. // Deduplication is performed by key, and only one of the same key is kept
  14. keys := make(map[interface{}]lang.PlaceholderType)
  15. for item := range p.source {
  16. key := fn(item)
  17. // The key is not retained if it exists
  18. if _, ok := keys[key]; !ok {
  19. source <- item
  20. keys[key] = lang.Placeholder
  21. }
  22. }
  23. })
  24. return Range(source)
  25. }

Walk

The concurrency of the Walk function works on each item in the stream. You can set the number of concurrency through WithWorkers. The default number of concurrency is 16, and the minimum number of concurrency is 1. If you set unlimitedWorkers to true, the number of concurrency is unlimited, but the number of concurrent writes in the stream is unlimited. The data is limited by defaultWorkers. In WalkFunc, users can customize the elements that are subsequently written to the stream, and can write multiple elements without writing:

  1. // Example
  2. fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  3. newItem := strings.ToUpper(item.(string))
  4. pipe <- newItem
  5. }).ForEach(func(item interface{}) {
  6. fmt.Println(item)
  7. })
  8. // Source Code
  9. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  10. pipe := make(chan interface{}, option.workers)
  11. go func() {
  12. var wg sync.WaitGroup
  13. pool := make(chan lang.PlaceholderType, option.workers)
  14. for {
  15. // Control the number of concurrent
  16. pool <- lang.Placeholder
  17. item, ok := <-p.source
  18. if !ok {
  19. <-pool
  20. break
  21. }
  22. wg.Add(1)
  23. go func() {
  24. defer func() {
  25. wg.Done()
  26. <-pool
  27. }()
  28. // Acting on every element
  29. fn(item, pipe)
  30. }()
  31. }
  32. // Wait for processing to complete
  33. wg.Wait()
  34. close(pipe)
  35. }()
  36. return Range(pipe)
  37. }

Concurrent processing

In addition to stream data processing, the fx tool also provides function concurrency. The realization of a function in microservices often requires multiple services. Concurrent processing dependence can effectively reduce dependency time and improve service performance.

b97bf7df-1781-436e-bf04-f1dd90c60537.png

  1. fx.Parallel(func() {
  2. userRPC()
  3. }, func() {
  4. accountRPC()
  5. }, func() {
  6. orderRPC()
  7. })

Note that when fx.Parallel performs dependency parallel processing, there will be no error return. If you need an error return, or a dependency error report needs to end the dependency request immediately, please use the MapReduce tool To process.

Summary

This article introduces the basic concepts of stream processing and the stream processing tool fx in go-zero. There are many stream processing scenarios in actual production. I hope this article can give you some inspiration and better response Stream processing scene at work.