一个通道的输出,作为下一个通道的输入,连绵不绝
下面实现了加法 乘法的流水线
// 流水线通道
package main
import "fmt"
func main() {
done := make(chan interface{})
defer close(done)
// 数据源
numStream := generate(done, 1, 2, 3, 4, 5)
// 乘法 加法 乘法
pipeline := multi(done, add(done, multi(done, numStream, 2), 1), 2)
for num := range pipeline {
fmt.Println(num)
}
}
// 接收一个中止信号,防止泄露
// 返回只读通道
func generate(done <-chan interface{}, num ...int) <-chan int {
numStream := make(chan int)
go func() {
defer close(numStream)
for _, i := range num {
select {
case <-done:
return
// 不断的向通道中写入数据
case numStream <- i:
}
}
}()
// 注意:隐式将通道转换成只读通道
return numStream
}
// 乘法器,从一个通道中接收数据,然后 乘以factor,将结果写入另一个通道中
//仍然要接收一个终止信号
func multi(done <-chan interface{}, numStream <-chan int, factor int) <-chan int {
multiStream := make(chan int)
go func() {
defer close(multiStream)
for i := range numStream {
select {
case <-done:
return
case multiStream <- (factor * i):
}
}
}()
return multiStream
}
// 加法器, 同上
func add(done <-chan interface{}, numStream <-chan int, factor int) <-chan int {
addStream := make(chan int)
go func() {
defer close(addStream)
for i := range numStream {
select {
case <-done:
return
case addStream <- (factor + i):
}
}
}()
return addStream
}