Pipelines in Golang
There's a really good article on the Golang blog about Go Concurrency: Pipelines and cancellation.
I came across the article while learning Go, more specifically, I was building an image processing pipeline for my home security cameras. The examples are great, however, I felt they could be improved a little.
First, it would be nice to programmatically create Pipelines by submitting an array of Pipes and having those pipes be "fitted" together by the API (vice wrapping the calls using functions). A second improvement I felt was missing was having a clean mechanism for closing the channel and providing the Pipeline some mechanism to perform cleanup. Also, I preferred a more OOP-styled API using interfaces vice functions of matching signatures.
I'm quite happy with the results and thought I would share.
First, my Pipe
definition is quite simple:
type Pipe interface {
Process (in chan int) chan int
}
Sq
(Square) and Add
are example implementations of the Pipe
definition.
type Sq struct {}
func (sq Sq) Process(in chan int) chan int {
out := make(chan int)
go func(){
for i := range in {
out <- i * i
}
close(out)
}()
return out
}
type Add struct {}
func (add Add) Process(in chan int) chan int {
out := make(chan int)
go func(){
for i := range in {
out <- i + i
}
close(out)
}()
return out
}
Instead of requiring developers to manually instantiate the Pipeline
, I would prefer to allow them to pass a variable number of Pipe
s to a function which will automatically "fit" each pipe to the next one:
func NewPipeline(pipes ...Pipe) Pipeline {
head := make(chan int)
var next_chan chan int
for _, pipe := range pipes {
if next_chan == nil {
next_chan = pipe.Process(head)
} else {
next_chan = pipe.Process(next_chan)
}
}
return Pipeline{ head: head, tail: next_chan }
}
One of my other improvements was to stop returning a channel in favor of a mechanism with clearer caller responsibilities:
type Pipeline struct {
head chan int
tail chan int
}
func (p *Pipeline) Enqueue(item int) {
p.head <- item
}
func (p *Pipeline) Dequeue(handler func(int)){
for i := range p.tail {
handler(i)
}
}
func (p *Pipeline) Close() {
close(p.head)
}
You can now instantiate the Pipeline
like so:
pipeline := NewPipeline(Sq{}, Add{})
And enqueue new items to the head of the Pipeline
:
go func(){
for i := 0; i < 10; i++ {
log.Infof("Sending: %v", i)
pipeline.Enqueue(i)
}
log.Info("Closing Pipeline.")
pipeline.Close()
}()
Finally, registering for the output is a lot more functional:
pipeline.Dequeue(func(i int){
log.Infof("Received: %v", i)
})
The pipeline pattern is incredibly powerful and simple to construct in Go. Unfortunately, without Generics as a language construct, this pattern will most likely need to be implemented on a per use case basis.
Stumbling my way through the great wastelands of enterprise software development.