Skip to content
Go back

28K+ connections, zero messages

Published:  at  03:08 PM

Last month, I built a real-time leaderboard system in Go that could handle 28,232 concurrent SSE connections - pretty impressive for a Python developer’s first Go project.

Then I ran a test with actual score updates and watched 99.99% of my messages disappear into the void.

Here’s how I learned about Go’s concurrency model the hard way, and why understanding channels, goroutines, mutexes deeply can make the difference between a system that works in testing and one that works in production.

Table of contents

Open Table of contents

The goal

I started building a real-time leaderboard to put into practice the concepts I was reading about on Go. This kind of system is used in real life in many applications, most commonly in games to show real time rankings.

This was structured purely as a learning project, but the target shifted to be more scaling, benchmarking and observability oriented. I wanted it to cross 10k concurrent connections on a single server.

The system

Since this was structured as a learning project, almost all the tech used was new to me.

I tried to choose the most sensible tech stack, and ended up with:

overall architecture

The overall architecture is as described in the diagram.

Given the use case, access patterns and scaling concerns, I decided to opt for a SSE based message sending for real time leaderboard updates. This is essentially passing the same leaderboard to a bunch (potentially 1000+) clients, so I had a few ways in which I could implement this, [tradeoffs considered here].

I started implementing the main SSE handling code, which I did progressively in stages, improving and learning along the way.


Naive implementation

In this version, the SSE handler function itself contains the Redis read call, a JSON marshaling sequence, a check to see if the previous data object received was the same (change detection - basic dedup).

Click to view code
package backend

import (
	"encoding/json"
	"fmt"
	"leaderboard/src/metrics"
	"leaderboard/src/redisclient"
	"time"

	"github.com/gin-gonic/gin"
)

func StreamLeaderboard(c *gin.Context) {
	c.Writer.Header().Set("Content-Type", "text/event-stream")
	c.Writer.Header().Set("Cache-Control", "no-cache")
	c.Writer.Header().Set("Connection", "keep-alive")
	c.Writer.Flush()

	metrics.ActiveSSEConnections.Inc()

	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			results, err := redisclient.GetTopNPlayers(c, "leaderboard", 10)
			if err != nil {
				continue
			}

			data, _ := json.Marshal(results)
			fmt.Fprintf(c.Writer, "data: %s\n\n", data)
			c.Writer.Flush()

		case <-c.Request.Context().Done():
			return
		}
	}
}

This has a load of issues.

  1. Redis reads coupled with the handler function - each SSE connection will have it’s own read call every 2 seconds. That’s immensely wasteful (considering the same data needs to go out). Higher queries per second to Redis soon becomes an issue that could turn into a scaling bottleneck.
  2. No deduplication - every single read gets converted to a data object to be sent over the network.
  3. Improper error handling

Added deduplication, better monitoring

Contains a basic deduplication measure - compare two json objects, object by object. This reduces the data sent, helpful to reduce network usage.

There’s also the addition of other metrics to help understand how things are working, what failures are occuring in the app.

Config variables are used more frequently in this update, reducing the number of hardcoded values needed. There’s still a few places where they are used, which may need to be modified later.

Changes:

Click to view code
 import (
 	"encoding/json"
 	"fmt"
+	"leaderboard/src/config"
 	"leaderboard/src/metrics"
 	"leaderboard/src/redisclient"
 	"time"
 )
 
 func StreamLeaderboard(c *gin.Context) {
+	metrics.ConcurrentClients.Inc()
+	defer metrics.ConcurrentClients.Dec()
+
 	c.Writer.Header().Set("Content-Type", "text/event-stream")
 	c.Writer.Header().Set("Cache-Control", "no-cache")
 	c.Writer.Header().Set("Connection", "keep-alive")
 	c.Writer.Flush()
 
 	metrics.ActiveSSEConnections.Inc()
+	defer metrics.ActiveSSEConnections.Dec()
 
-	ticker := time.NewTicker(2 * time.Second)
+	ticker := time.NewTicker(5 * time.Second)
 	defer ticker.Stop()
 
+	var lastData []byte
+
 	for {
 		select {
 		case <-ticker.C:
-			results, err := redisclient.GetTopNPlayers(c, "leaderboard", 10)
+			results, err := redisclient.GetTopNPlayers(c, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
+			if err != nil {			metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
+			}
+			jsonStart :=time.Now()
+			data, err := json.Marshal(results)
+			metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
+    
 			if err != nil {
-				continue
+				metrics.JSONErrors.WithLabelValues("marshal").Inc()
+				return
+			}
+			if !jsonEqual(data, lastData) {
+				fmt.Fprintf(c.Writer, "data: %s\n\n", data)
+				c.Writer.Flush()
+				metrics.SSEMessagesSent.Inc()
+				lastData = data
 			}
-
-			data, _ := json.Marshal(results)
-			fmt.Fprintf(c.Writer, "data: %s\n\n", data)
-			c.Writer.Flush()
 
 		case <-c.Request.Context().Done():
+			metrics.DroppedSSEConnections.Inc()
 			return
 		}
 	}
+}
+
+func jsonEqual(a, b []byte) bool {
+	return string(a) == string(b)
 }

Issues:

  1. JSON object comparison is expensive (in case of large blobs, which may be encountered here).
  2. Marshaling still happens on each read, and each read is still coupled, which means high QPS on Redis.

Decoupling Redis reads (separate goroutine), dedup improvements

Instead of having each client hammering Redis, I implemented a broadcast pattern using goroutines and channels (single producer, multiple consumers).

Changes:

Click to view code
 package backend
 
 import (
+	"context"
+	"crypto/sha256"
 	"encoding/json"
 	"fmt"
 	"leaderboard/src/config"
 	"leaderboard/src/metrics"
 	"leaderboard/src/redisclient"
+	"sync"
 	"time"
 
 	"github.com/gin-gonic/gin"
 )
 
+type LeaderboardUpdate struct {
+	Data []byte
+	Hash [32]byte
+}
+
+type LeaderboardBroadcaster struct {
+	// channel for all SSE conns to listen to get lb updates
+	broadcastChan chan LeaderboardUpdate
+
+	ctx    context.Context
+	cancel context.CancelFunc
+	wg     sync.WaitGroup
+}
+
+func CreateLeaderboardBroadcaster() *LeaderboardBroadcaster {
+	ctx, cancel := context.WithCancel(context.Background())
+
+	lb := &LeaderboardBroadcaster{
+		// make the channel buffered - clients may be slow, messages can pile up
+		broadcastChan: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
+		ctx:           ctx,
+		cancel:        cancel,
+	}
+
+	lb.wg.Add(1)
+	go lb.detectLeaderboardChanges()
+	return lb
+}
+
+func (lb *LeaderboardBroadcaster) StopBroadcast() {
+	lb.cancel()
+	lb.wg.Wait()
+	close(lb.broadcastChan)
+}
+
+func (lb *LeaderboardBroadcaster) GetBroadcastChannel() <-chan LeaderboardUpdate {
+	return lb.broadcastChan
+}
+
+// package level var
+var broadcaster *LeaderboardBroadcaster
+
+func SetBroadcaster(b *LeaderboardBroadcaster) {
+	broadcaster = b
+}
+
+func (lb *LeaderboardBroadcaster) detectLeaderboardChanges() {
+	defer lb.wg.Done()
+
+	// TODO: check this time conversion
+	ticker := time.NewTicker(time.Duration(config.AppConfig.Server.PollingIntervalSeconds) * time.Second)
+	defer ticker.Stop()
+
+	var lastHash [32]byte
+
+	for {
+		select {
+		case <-ticker.C:
+			results, err := redisclient.GetTopNPlayers(lb.ctx, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
+			if err != nil {
+				metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
+				config.Error("Failed to fetch leaderboard from Redis.", map[string]any{"Error": err, "source": "/stream-leaderboard"})
+				continue
+			}
+
+			resultString := fmt.Sprintf("%+v", results)
+			currentHash := sha256.Sum256([]byte(resultString))
+
+			if currentHash != lastHash {
+				lastHash = currentHash
+
+				jsonStart := time.Now()
+				jsonData, err := json.Marshal(results)
+				if err != nil {
+					config.Error("JSON marshaling error",
+						map[string]any{"Error": err, "source": "/stream-leaderboard", "results": results})
+					metrics.JSONErrors.WithLabelValues("marshal").Inc()
+					continue
+				}
+				metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
+
+				update := LeaderboardUpdate{
+					Data: jsonData,
+					Hash: currentHash,
+				}
+
+				// non blocking send
+				select {
+				case lb.broadcastChan <- update:
+				default:
+				}
+			}
+		case <-lb.ctx.Done():
+			return
+		}
+	}
+}
+
 func StreamLeaderboard(c *gin.Context) {
 	metrics.ConcurrentClients.Inc()
 	defer metrics.ConcurrentClients.Dec()
 	c.Writer.Flush()
 
 	metrics.ActiveSSEConnections.Inc()
+	config.Info("New SSE conn", map[string]any{"Num active clients": metrics.ActiveSSEConnections})
 	defer metrics.ActiveSSEConnections.Dec()
 
-	ticker := time.NewTicker(5 * time.Second)
-	defer ticker.Stop()
-
-	var lastData []byte
+	broadcastChan := broadcaster.GetBroadcastChannel()
 
 	for {
 		select {
-		case <-ticker.C:
-			results, err := redisclient.GetTopNPlayers(c, "leaderboard", int64(config.AppConfig.Leaderboard.TopPlayersLimit))
-			if err != nil {
-				metrics.RedisOperationErrors.WithLabelValues("get_top_players").Inc()
-			}
-			jsonStart := time.Now()
-			data, err := json.Marshal(results)
-			metrics.JSONMarshalDuration.Observe(time.Since(jsonStart).Seconds())
-
-			if err != nil {
-				metrics.JSONErrors.WithLabelValues("marshal").Inc()
+		case update, ok := <-broadcastChan:
+			if !ok {
+				// channel closed
 				return
 			}
-			if !jsonEqual(data, lastData) {
-				fmt.Fprintf(c.Writer, "data: %s\n\n", data)
-				c.Writer.Flush()
-				metrics.SSEMessagesSent.Inc()
-				lastData = data
-			}
+			fmt.Fprintf(c.Writer, "data: %s\n\n", update.Data)
+			c.Writer.Flush()
+			metrics.SSEMessagesSent.Inc()
 
 		case <-c.Request.Context().Done():
 			metrics.DroppedSSEConnections.Inc()
+			config.Info("Closed SSE conn", map[string]any{"open": metrics.ActiveSSEConnections})
 			return
 		}
 	}
 }
-
-func jsonEqual(a, b []byte) bool {
-	return string(a) == string(b)
-}

Benchmarking this with a Go script that opens persistent SSE connections (based off of Eran Yanay’s Gophercon talk) got me 28,232 concurrent connections per second.

Grafana dashboard with 28232 connections

I was pretty happy with these results, especially since the connections maxed out due to hitting the upper limit of outbound ports on Linux, not due to memory or CPU bottlenecks.

Then I realized I had a major bug which I forgot to test for.

Issues:

  1. Fan out implementation is incorrect - One message can be consumed by only a single consumer (goroutine) in a channel. This effectively means - any event added to the channel is consumed by a single goroutine, others never get the message.
  2. Error handling gaps.
  3. Initial message is not sent (messages sent only when the ticker event occurs and there is an update). We should have the client receive some leaderboard message on connect, even if it’s somewhat stale.

Fixed Fan Out

The previous version had a single channel for all broadcast messages to be sent, which would effectively be received by a single goroutine (client). This defeats the purpose of my app, for which I need every single client to receive the same message.

Changes:

Alternative implementations might use Pubsub with popular providers, but it’s probably overkill for this use case.

Grafana dashboard with fanout fix

Grafana dashboard for game submissions and SSE messages received

Click to view code
-type LeaderboardBroadcaster struct {
-	// channel for all SSE conns to listen to get lb updates
-	broadcastChan chan LeaderboardUpdate
+type Client struct {
+	ID      int64
+	channel chan LeaderboardUpdate
+	ctx     context.Context
+	cancel  context.CancelFunc
+}
 
-	ctx    context.Context
-	cancel context.CancelFunc
-	wg     sync.WaitGroup
+type LeaderboardBroadcaster struct {
+	clients       map[int64]*Client
+	clientsMutex  sync.RWMutex
+	ctx           context.Context
+	cancel        context.CancelFunc
+	wg            sync.WaitGroup
+	clientCounter int64
 }
 
 func CreateLeaderboardBroadcaster() *LeaderboardBroadcaster {
 	lb := &LeaderboardBroadcaster{
 		// make the channel buffered - clients may be slow, messages can pile up
-		broadcastChan: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
-		ctx:           ctx,
-		cancel:        cancel,
+		clients: make(map[int64]*Client),
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 	lb.wg.Add(1)
 	return lb
 }
 
-func (lb *LeaderboardBroadcaster) GetBroadcastChannel() <-chan LeaderboardUpdate {
-	return lb.broadcastChan
+// Create new channel for client, add to map
+func (lb *LeaderboardBroadcaster) AddClient() (*Client, <-chan LeaderboardUpdate) {
+	lb.clientsMutex.Lock()
+	lb.clientCounter++
+	ctx, cancel := context.WithCancel(lb.ctx)
+
+	client := &Client{
+		ID:      lb.clientCounter,
+		ctx:     ctx,
+		cancel:  cancel,
+		channel: make(chan LeaderboardUpdate, config.AppConfig.Server.BroadcastBufferSize),
+	}
+	lb.clients[lb.clientCounter] = client
+	lb.clientsMutex.Unlock()
+
+	return client, client.channel
+}
+
+// remove specific client channel - closed connection
+func (lb *LeaderboardBroadcaster) RemoveClient(client *Client) {
+	lb.clientsMutex.Lock()
+	defer lb.clientsMutex.Unlock()
+
+	if _, exists := lb.clients[client.ID]; exists {
+		delete(lb.clients, client.ID)
+		client.cancel()
+		close(client.channel)
+	}
+}
+
+// broadcastToAllClients sends an update to all connected clients
+func (lb *LeaderboardBroadcaster) broadcastToAllClients(update LeaderboardUpdate) {
+	lb.clientsMutex.RLock()
+
+	var clientsToRemove []*Client
+
+	// what to do in case Client channel is full, skip this client (to be changed later - add channel clearing mechanism + alerting)
+	for _, client := range lb.clients {
+		select {
+		case client.channel <- update:
+			// sent
+		case <-client.ctx.Done():
+			// clean
+			clientsToRemove = append(clientsToRemove, client)
+		default:
+			metrics.FilledSSEChannels.Inc()
+		}
+	}
+	lb.clientsMutex.RUnlock()
+
+	if len(clientsToRemove) > 0 {
+		lb.clientsMutex.Lock()
+		for _, client := range clientsToRemove {
+			if _, exists := lb.clients[client.ID]; exists {
+				delete(lb.clients, client.ID)
+				client.cancel()
+				close(client.channel)
+			}
+		}
+		lb.clientsMutex.Unlock()
+	}
 }
 
 // package level var
 	broadcaster = b
 
+// poll redis, dedup leaderboard values, push to broadcast to all clients
 func (lb *LeaderboardBroadcaster) detectLeaderboardChanges() {
 	defer lb.wg.Done()
-
-	// TODO: check this time conversion
 	ticker := time.NewTicker(time.Duration(config.AppConfig.Server.PollingIntervalSeconds) * time.Second)
 	defer ticker.Stop()
 					Hash: currentHash,
 				}
 
-				// non blocking send
-				select {
-				case lb.broadcastChan <- update:
-				default:
-				}
+				lb.broadcastToAllClients(update)
 			}
 		case <-lb.ctx.Done():
 			return
 	c.Writer.Flush()
 
 	metrics.ActiveSSEConnections.Inc()
-	config.Info("New SSE conn", map[string]any{"Num active clients": metrics.ActiveSSEConnections})
+	config.Info("New SSE conn", map[string]any{})
 	defer metrics.ActiveSSEConnections.Dec()
 
-	broadcastChan := broadcaster.GetBroadcastChannel()
+	client, channel := broadcaster.AddClient()
+	defer broadcaster.RemoveClient(client)
 
 	for {
 		select {
-		case update, ok := <-broadcastChan:
+		case update, ok := <-channel:
 			if !ok {
 				// channel closed
 				return
 			fmt.Fprintf(c.Writer, "data: %s\n\n", update.Data)
 			c.Writer.Flush()
 			metrics.SSEMessagesSent.Inc()
-
 		case <-c.Request.Context().Done():
 			metrics.DroppedSSEConnections.Inc()
 			config.Info("Closed SSE conn", map[string]any{"open": metrics.ActiveSSEConnections})


Other Mistakes

  1. Incorrect testing script: Bugs can occur in any piece of code, tests included. The initial load testing script handled about 300-400 conns concurrently, took forever to add clients, I’d get tired of waiting for it to cross a certain level and almost gave up, thinking my code was the bottleneck.

    Later I looked at Eran Yanay’s implementation of the testing script for his million websocket connection talk (Going Infinite), took inspiration from that and created a version for SSE. This worked and I immediately got thousands of connections, and with improvements to my code, crossed 28k.

TODOS:

  1. Reduce Grafana and Prometheus intervals
  2. Add retries to SSE send, add jitter to prevent thundering herd
  3. Throughput metrics - per time interval
  4. Buffer flush mechanism
  5. Send a leaderboard update immediately on connecting to SSE stream
  6. Validation for score updates - with proper error messages
  7. Authentication for game servers
  8. Postgres integration (leaderboard historical data - aggregatiion)
  9. Redis/database based id to username translation

Where to go from here



Next Post
Blogs (and discussions, threads) that I found interesting