通道流水线工作模式

一个通道的输出,作为下一个通道的输入,连绵不绝

下面实现了加法 乘法的流水线

// 流水线通道
package main

import "fmt"

func main() {
	done := make(chan interface{})
	defer close(done)
	// 数据源
	numStream := generate(done, 1, 2, 3, 4, 5)
	// 乘法 加法 乘法
	pipeline := multi(done, add(done, multi(done, numStream, 2), 1), 2)
	for num := range pipeline {
		fmt.Println(num)
	}
}

// 接收一个中止信号,防止泄露
// 返回只读通道
func generate(done <-chan interface{}, num ...int) <-chan int {
	numStream := make(chan int)
	go func() {
		defer close(numStream)
		for _, i := range num {
			select {
			case <-done:
				return
				// 不断的向通道中写入数据
			case numStream <- i:
			}
		}
	}()
	// 注意:隐式将通道转换成只读通道
	return numStream
}

// 乘法器,从一个通道中接收数据,然后 乘以factor,将结果写入另一个通道中
//仍然要接收一个终止信号
func multi(done <-chan interface{}, numStream <-chan int, factor int) <-chan int {
	multiStream := make(chan int)
	go func() {
		defer close(multiStream)
		for i := range numStream {
			select {
			case <-done:
				return
			case multiStream <- (factor * i):
			}
		}
	}()
	return multiStream
}

// 加法器, 同上
func add(done <-chan interface{}, numStream <-chan int, factor int) <-chan int {
	addStream := make(chan int)
	go func() {
		defer close(addStream)
		for i := range numStream {
			select {
			case <-done:
				return
			case addStream <- (factor + i):
			}
		}
	}()
	return addStream
}