Broadcast Channels in Go
Go channels implement a simple way to pass data between concurrent processes. Their one-to-one nature makes them great for handling ownership of resources or allocation of work to a pool of workers. However, in some cases it’s useful to have a different type of channel – a channel in which a single message sent can be received by multiple processes simultaneously.
I ran into this issue while developing ground station software for Penn State’s OSIRIS Satellite project. Our ground station has a server which listens for packets from the satellite, but because our radio code often makes multiple connections to this service it’s necessary to send every transmitted packet to every open connection simultaneously. In a more traditional application such as a webserver, a broadcast channel would be useful to alert all connected clients of the action of a single user, for example broadcasting when users join, leave, and talk on a chat server.
A naive approach is to maintain a slice of listener channels and then have a method which broadcasts to all of these channels. This gets complicated quickly, as the slice needs to be updated concurrently. The approach also gets more complicated when the sender wants different levels of blocking.
What We Need
We need a broadcast solution which satisfies several properties. On the sending side:
- A sender must be able to block on sending until a process receives the message.
- It must also be able to select on the channel, timing out the send or picking it as one of many channels, just as can be done with normal Go channels.
- A channel must be able to be closed so that listeners know to stop listening.
On the receiving side:
- A receiver must all receive a message if they are listening at the time it is sent.
- it must not panic if the channel is closed.
- It must be able to select on the channel receiving as well.
Interestingly these traits are almost all traits of the default channel object. While most broadcast libraries wrap a fair bit of code around the built-in channels, I wanted to create broadcast using as little wrapping as possible.
Keeping it Simple
We’ll start with an interface channel as our definition:
type Broadcast chan interface{}
Since all the sending side properties are already present in a channel, we actually don’t need to change sending at all – processes can push data just like to any channel. On the receiving side is where things get interesting. We’ll start with a naive approach:
func (b Broadcast) Receive() (msg interface{}) {
msg = <-b
select {
case b <- msg:
default:
}
return
}
This code will receive from the channel, and if anything else is listening on the channel the message will be forwarded to them as well. For some uses this is sufficient! However, we have a few problems for any real use.
Bringing it together
The method returns immediately after forwarding to the next method. If enough processes are waiting then by the time they all receive the message this process could begin listening again, creating an endless loop of processes forwarding messages to each other. We need a way to wait until all receivers have gotten the packet before continuing. Instead of just sending the message, let’s create a struct which holds the message and a completion channel:
type message struct {
contents interface{}
done chan<- bool
}
func (b Broadcast) Receive() (msg interface{}) {
defer func() { recover() }() // Don't panic on rebroadcasts.
msg = <-b
switch t := msg.(type) {
case message:
defer close(t.done)
msg = t.contents
}
done := make(chan bool)
select {
case b <- message{msg, done}:
<-done
default:
}
return
}
There are a few points of interest here. First off, why the type switch? We want to maintain the simplicity of sending over our channel, and so for the first receiver of a message the message is the actual value and not the wrapped value. Thankfully the only difference between the two is that for messages we need to extract the contents and report completion when we’re done. When sending the message we wrap it again with a new completion channel. We send the message over the channel and if its received wait for it to be acknowledged. At this point we pass acknowledge back up the chain until there is nothing left – the message has been successfully broadcast.
Uses
The broadcast model for this system is slightly different from what many other libraries implement, so it is not suitable for every use case. The main difference is that unless the channel is being actively listened to there is no guarantee that the message will be received by all processes. This is in contrast with other libraries where a listener is constructed and then for the life of that listener it is guaranteed messages. There is a tradeoff here – my broadcast implementation benchmarks better as listeners increase, since there is less overhead to listening:
BenchmarkNSubscribers1MessageBroadcast-4 200000 15286 ns/op
Benchmark1SubscriberNMessagesBroadcast-4 2000000 657 ns/op
BenchmarkTjgqNSubscribers1MessageBroadcast-4 1000000 21462 ns/op
BenchmarkTjgq1SubscriberNMessagesBroadcast-4 3000000 631 ns/op
You can find the code on GitHub, or just go get github.com/ericpauley/broadcast
.