Last month, I built a real-time leaderboard system in Go that could handle 28,232 concurrent SSE connections - pretty impressive for a Python developer’s first Go project.
Then I ran a test with actual score updates and watched 99.99% of my messages disappear into the void.
Here’s how I learned about Go’s concurrency model the hard way, and why understanding channels, goroutines, mutexes deeply can make the difference between a system that works in testing and one that works in production.
Table of contents
Open Table of contents
The goal
I started building a real-time leaderboard to put into practice the concepts I was reading about on Go. This kind of system is used in real life in many applications, most commonly in games to show real time rankings.
This was structured purely as a learning project, but the target shifted to be more scaling, benchmarking and observability oriented. I wanted it to cross 10k concurrent connections on a single server.
The system
Since this was structured as a learning project, almost all the tech used was new to me.
I tried to choose the most sensible tech stack, and ended up with:
- Go for the backend (high concurrency, more fine grained control and less resource usage than python)
- Redis for the sorted set data structure. Could have implemented in pure Go, but overkill at this stage.
- Prometheus and Grafana for observability - wanted to learn how observability works and get hands on experience with it.
- Docker and Docker compose for coordinating everything. Wrote a blog on docker optimization for this project.
- PostgreSQL for persistent storage, historical time queries (in the works).
The overall architecture is as described in the diagram.
Given the use case, access patterns and scaling concerns, I decided to opt for a SSE based message sending for real time leaderboard updates. This is essentially passing the same leaderboard to a bunch (potentially 1000+) clients, so I had a few ways in which I could implement this, [tradeoffs considered here].
I started implementing the main SSE handling code, which I did progressively in stages, improving and learning along the way.
Naive implementation
In this version, the SSE handler function itself contains the Redis read call, a JSON marshaling sequence, a check to see if the previous data object received was the same (change detection - basic dedup).
Click to view code
package backend
import (
"encoding/json"
"fmt"
"leaderboard/src/metrics"
"leaderboard/src/redisclient"
"time"
"github.com/gin-gonic/gin"
)
func StreamLeaderboard(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Flush()
metrics.ActiveSSEConnections.Inc()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
results, err := redisclient.GetTopNPlayers(c, "leaderboard", 10)
if err != nil {
continue
}
data, _ := json.Marshal(results)
fmt.Fprintf(c.Writer, "data: %s\n\n", data)
c.Writer.Flush()
case <-c.Request.Context().Done():
return
}
}
}
This has a load of issues.
- Redis reads coupled with the handler function - each SSE connection will have it’s own read call every 2 seconds. That’s immensely wasteful (considering the same data needs to go out). Higher queries per second to Redis soon becomes an issue that could turn into a scaling bottleneck.
- No deduplication - every single read gets converted to a data object to be sent over the network.
- Improper error handling
Added deduplication, better monitoring
Contains a basic deduplication measure - compare two json objects, object by object. This reduces the data sent, helpful to reduce network usage.
There’s also the addition of other metrics to help understand how things are working, what failures are occuring in the app.
Config variables are used more frequently in this update, reducing the number of hardcoded values needed. There’s still a few places where they are used, which may need to be modified later.
Changes:
- Deduplication by comparing JSON objects
- Replace harcoded values by configs
- Improved Prometheus metrics
Click to view code
import (
"encoding/json"
"fmt"
+ "leaderboard/src/config"
"leaderboard/src/metrics"
"leaderboard/src/redisclient"
"time"
)
func StreamLeaderboard(c *gin.Context) {
+ metrics.ConcurrentClients.Inc()
+ defer metrics.ConcurrentClients.Dec()
+
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Flush()
metrics.ActiveSSEConnections.Inc()
+ defer metrics.ActiveSSEConnections.Dec()
- ticker := time.NewTicker(2 * time.Second)
+ ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
+ var lastData []byte
+
for {
select {
case <-ticker.C:
- results, err := redisclient.GetTopNPlayers(c, "leaderboard", 10)
+ results, err := redisclient.GetTopNPlayers(c, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
+ if err != nil { metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
+ }
+ jsonStart :=time.Now()
+ data, err := json.Marshal(results)
+ metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
+
if err != nil {
- continue
+ metrics.JSONErrors.WithLabelValues("marshal").Inc()
+ return
+ }
+ if !jsonEqual(data, lastData) {
+ fmt.Fprintf(c.Writer, "data: %s\n\n", data)
+ c.Writer.Flush()
+ metrics.SSEMessagesSent.Inc()
+ lastData = data
}
-
- data, _ := json.Marshal(results)
- fmt.Fprintf(c.Writer, "data: %s\n\n", data)
- c.Writer.Flush()
case <-c.Request.Context().Done():
+ metrics.DroppedSSEConnections.Inc()
return
}
}
+}
+
+func jsonEqual(a, b []byte) bool {
+ return string(a) == string(b)
}
Issues:
- JSON object comparison is expensive (in case of large blobs, which may be encountered here).
- Marshaling still happens on each read, and each read is still coupled, which means high QPS on Redis.
Decoupling Redis reads (separate goroutine), dedup improvements
Instead of having each client hammering Redis, I implemented a broadcast pattern using goroutines and channels (single producer, multiple consumers).
Changes:
- Separate goroutine for Redis polling
- Deduplication using hashing (SHA256)
- JSON marshaling done only when data is changed
Click to view code
package backend
import (
+ "context"
+ "crypto/sha256"
"encoding/json"
"fmt"
"leaderboard/src/config"
"leaderboard/src/metrics"
"leaderboard/src/redisclient"
+ "sync"
"time"
"github.com/gin-gonic/gin"
)
+type LeaderboardUpdate struct {
+ Data []byte
+ Hash [32]byte
+}
+
+type LeaderboardBroadcaster struct {
+ // channel for all SSE conns to listen to get lb updates
+ broadcastChan chan LeaderboardUpdate
+
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+}
+
+func CreateLeaderboardBroadcaster() *LeaderboardBroadcaster {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ lb := &LeaderboardBroadcaster{
+ // make the channel buffered - clients may be slow, messages can pile up
+ broadcastChan: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
+ ctx: ctx,
+ cancel: cancel,
+ }
+
+ lb.wg.Add(1)
+ go lb.detectLeaderboardChanges()
+ return lb
+}
+
+func (lb *LeaderboardBroadcaster) StopBroadcast() {
+ lb.cancel()
+ lb.wg.Wait()
+ close(lb.broadcastChan)
+}
+
+func (lb *LeaderboardBroadcaster) GetBroadcastChannel() <-chan LeaderboardUpdate {
+ return lb.broadcastChan
+}
+
+// package level var
+var broadcaster *LeaderboardBroadcaster
+
+func SetBroadcaster(b *LeaderboardBroadcaster) {
+ broadcaster = b
+}
+
+func (lb *LeaderboardBroadcaster) detectLeaderboardChanges() {
+ defer lb.wg.Done()
+
+ // TODO: check this time conversion
+ ticker := time.NewTicker(time.Duration(config.AppConfig.Server.PollingIntervalSeconds) * time.Second)
+ defer ticker.Stop()
+
+ var lastHash [32]byte
+
+ for {
+ select {
+ case <-ticker.C:
+ results, err := redisclient.GetTopNPlayers(lb.ctx, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
+ if err != nil {
+ metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
+ config.Error("Failed to fetch leaderboard from Redis.", map[string]any{"Error": err, "source": "/stream-leaderboard"})
+ continue
+ }
+
+ resultString := fmt.Sprintf("%+v", results)
+ currentHash := sha256.Sum256([]byte(resultString))
+
+ if currentHash != lastHash {
+ lastHash = currentHash
+
+ jsonStart := time.Now()
+ jsonData, err := json.Marshal(results)
+ if err != nil {
+ config.Error("JSON marshaling error",
+ map[string]any{"Error": err, "source": "/stream-leaderboard", "results": results})
+ metrics.JSONErrors.WithLabelValues("marshal").Inc()
+ continue
+ }
+ metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
+
+ update := LeaderboardUpdate{
+ Data: jsonData,
+ Hash: currentHash,
+ }
+
+ // non blocking send
+ select {
+ case lb.broadcastChan <- update:
+ default:
+ }
+ }
+ case <-lb.ctx.Done():
+ return
+ }
+ }
+}
+
func StreamLeaderboard(c *gin.Context) {
metrics.ConcurrentClients.Inc()
defer metrics.ConcurrentClients.Dec()
c.Writer.Flush()
metrics.ActiveSSEConnections.Inc()
+ config.Info("New SSE conn", map[string]any{"Num active clients": metrics.ActiveSSEConnections})
defer metrics.ActiveSSEConnections.Dec()
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
-
- var lastData []byte
+ broadcastChan := broadcaster.GetBroadcastChannel()
for {
select {
- case <-ticker.C:
- results, err := redisclient.GetTopNPlayers(c, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
- if err != nil {
- metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
- }
- jsonStart := time.Now()
- data, err := json.Marshal(results)
- metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
-
- if err != nil {
- metrics.JSONErrors.WithLabelValues("marshal").Inc()
+ case update, ok := <-broadcastChan:
+ if !ok {
+ // channel closed
return
}
- if !jsonEqual(data, lastData) {
- fmt.Fprintf(c.Writer, "data: %s\n\n", data)
- c.Writer.Flush()
- metrics.SSEMessagesSent.Inc()
- lastData = data
- }
+ fmt.Fprintf(c.Writer, "data: %s\n\n", update.Data)
+ c.Writer.Flush()
+ metrics.SSEMessagesSent.Inc()
case <-c.Request.Context().Done():
metrics.DroppedSSEConnections.Inc()
+ config.Info("Closed SSE conn", map[string]any{"open": metrics.ActiveSSEConnections})
return
}
}
}
-
-func jsonEqual(a, b []byte) bool {
- return string(a) == string(b)
-}
Benchmarking this with a Go script that opens persistent SSE connections (based off of Eran Yanay’s Gophercon talk) got me 28,232 concurrent connections per second.
I was pretty happy with these results, especially since the connections maxed out due to hitting the upper limit of outbound ports on Linux, not due to memory or CPU bottlenecks.
Then I realized I had a major bug which I forgot to test for.
Issues:
- Fan out implementation is incorrect - One message can be consumed by only a single consumer (goroutine) in a channel. This effectively means - any event added to the channel is consumed by a single goroutine, others never get the message.
- Error handling gaps.
- Initial message is not sent (messages sent only when the ticker event occurs and there is an update). We should have the client receive some leaderboard message on connect, even if it’s somewhat stale.
Fixed Fan Out
The previous version had a single channel for all broadcast messages to be sent, which would effectively be received by a single goroutine (client). This defeats the purpose of my app, for which I need every single client to receive the same message.
Changes:
- One buffered channel per client, stored in a map
- Mutex applied whenever map is modified, preventing race conditions
- Leaderboard change leads to broadcasting message to all channels
Alternative implementations might use Pubsub with popular providers, but it’s probably overkill for this use case.
Click to view code
-type LeaderboardBroadcaster struct {
- // channel for all SSE conns to listen to get lb updates
- broadcastChan chan LeaderboardUpdate
+type Client struct {
+ ID int64
+ channel chan LeaderboardUpdate
+ ctx context.Context
+ cancel context.CancelFunc
+}
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
+type LeaderboardBroadcaster struct {
+ clients map[int64]*Client
+ clientsMutex sync.RWMutex
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+ clientCounter int64
}
func CreateLeaderboardBroadcaster() *LeaderboardBroadcaster {
lb := &LeaderboardBroadcaster{
// make the channel buffered - clients may be slow, messages can pile up
- broadcastChan: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
- ctx: ctx,
- cancel: cancel,
+ clients: make(map[int64]*Client),
+ ctx: ctx,
+ cancel: cancel,
}
lb.wg.Add(1)
return lb
}
-func (lb *LeaderboardBroadcaster) GetBroadcastChannel() <-chan LeaderboardUpdate {
- return lb.broadcastChan
+// Create new channel for client, add to map
+func (lb *LeaderboardBroadcaster) AddClient() (*Client, <-chan LeaderboardUpdate) {
+ lb.clientsMutex.Lock()
+ lb.clientCounter++
+ ctx, cancel := context.WithCancel(lb.ctx)
+
+ client := &Client{
+ ID: lb.clientCounter,
+ ctx: ctx,
+ cancel: cancel,
+ channel: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
+ }
+ lb.clients[lb.clientCounter] = client
+ lb.clientsMutex.Unlock()
+
+ return client, client.channel
+}
+
+// remove specific client channel - closed connection
+func (lb *LeaderboardBroadcaster) RemoveClient(client *Client) {
+ lb.clientsMutex.Lock()
+ defer lb.clientsMutex.Unlock()
+
+ if _, exists := lb.clients[client.ID]; exists {
+ delete(lb.clients, client.ID)
+ client.cancel()
+ close(client.channel)
+ }
+}
+
+// broadcastToAllClients sends an update to all connected clients
+func (lb *LeaderboardBroadcaster) broadcastToAllClients(update LeaderboardUpdate) {
+ lb.clientsMutex.RLock()
+
+ var clientsToRemove []*Client
+
+ // what to do in case Client channel is full, skip this client (to be changed later - add channel clearing mechanism + alerting)
+ for _, client := range lb.clients {
+ select {
+ case client.channel <- update:
+ // sent
+ case <-client.ctx.Done():
+ // clean
+ clientsToRemove = append(clientsToRemove, client)
+ default:
+ metrics.FilledSSEChannels.Inc()
+ }
+ }
+ lb.clientsMutex.RUnlock()
+
+ if len(clientsToRemove) > 0 {
+ lb.clientsMutex.Lock()
+ for _, client := range clientsToRemove {
+ if _, exists := lb.clients[client.ID]; exists {
+ delete(lb.clients, client.ID)
+ client.cancel()
+ close(client.channel)
+ }
+ }
+ lb.clientsMutex.Unlock()
+ }
}
// package level var
broadcaster = b
+// poll redis, dedup leaderboard values, push to broadcast to all clients
func (lb *LeaderboardBroadcaster) detectLeaderboardChanges() {
defer lb.wg.Done()
-
- // TODO: check this time conversion
ticker := time.NewTicker(time.Duration(config.AppConfig.Server.PollingIntervalSeconds) * time.Second)
defer ticker.Stop()
Hash: currentHash,
}
- // non blocking send
- select {
- case lb.broadcastChan <- update:
- default:
- }
+ lb.broadcastToAllClients(update)
}
case <-lb.ctx.Done():
return
c.Writer.Flush()
metrics.ActiveSSEConnections.Inc()
- config.Info("New SSE conn", map[string]any{"Num active clients": metrics.ActiveSSEConnections})
+ config.Info("New SSE conn", map[string]any{})
defer metrics.ActiveSSEConnections.Dec()
- broadcastChan := broadcaster.GetBroadcastChannel()
+ client, channel := broadcaster.AddClient()
+ defer broadcaster.RemoveClient(client)
for {
select {
- case update, ok := <-broadcastChan:
+ case update, ok := <-channel:
if !ok {
// channel closed
return
fmt.Fprintf(c.Writer, "data: %s\n\n", update.Data)
c.Writer.Flush()
metrics.SSEMessagesSent.Inc()
-
case <-c.Request.Context().Done():
metrics.DroppedSSEConnections.Inc()
config.Info("Closed SSE conn", map[string]any{"open": metrics.ActiveSSEConnections})
Other Mistakes
-
Incorrect testing script: Bugs can occur in any piece of code, tests included. The initial load testing script handled about 300-400 conns concurrently, took forever to add clients, I’d get tired of waiting for it to cross a certain level and almost gave up, thinking my code was the bottleneck.
Later I looked at Eran Yanay’s implementation of the testing script for his million websocket connection talk (Going Infinite), took inspiration from that and created a version for SSE. This worked and I immediately got thousands of connections, and with improvements to my code, crossed 28k.
TODOS:
- Reduce Grafana and Prometheus intervals
- Add retries to SSE send, add jitter to prevent thundering herd
- Throughput metrics - per time interval
- Buffer flush mechanism
- Send a leaderboard update immediately on connecting to SSE stream
- Validation for score updates - with proper error messages
- Authentication for game servers
- Postgres integration (leaderboard historical data - aggregatiion)
- Redis/database based id to username translation