
RabbitMQ with Go: A hands-on guide of scalable and asynchronous systems
What is RabbitMQ?
RabbitMQ is a message broker that’s widely used to enable asynchronous communication between multiple services in a distributed system. It is a go-to tool due to its ease of creating and managing message queues while ensuring reliable communication and message persistence.
In modern microservices architecture, services often need to communicate asynchronously to remain decoupled and scalable. RabbitMQ acts as an intermediary by queuing messages between services. This ensures that even if a server is temporarily down, messages are not lost and will be processed once the service is available again.
In this article, we are going to implement and explain a RabbitMQ queue in our previous repository. To do that, we will implement a user-following feature into our API. When a user follows another user, a message will be sent into a RabbitMQ queue. A separate consumer will listen for these messages and notify the followed user. This simple setup is sufficient to simulate an event-driven, distributed system.
Core concepts
Before implementing a queue in our API, let’s introduce some core RabbitMQ concepts here, so we can understand better what is happening in our code. RabbitMQ follows a producer-consumer model. The main components include:
- Producer -> This service inserts messages into a message queue that another service will consume;
- Consumer -> A service that listens to a queue and processes incoming messages;
- Exchange -> Exchanges define how messages are going to be routed into the queues. There are multiple ways to do this route, which we will discuss here in this article;
- Queue -> The queue stores the messages until they are processed by a consumer, this is the component that ensures that every message is successfully consumed.
- Channel -> Channels are lightweight connections that share a single TCP connection. This makes it easier to manage multiple different connections to the broker simultaneously.
Setting up project
Setting up RabbitMQ with Docker compose
First, let’s set up our Docker compose file to run a RabbitMQ container and integrate it with our API using a new network called broker
. It should look like this.
# compose.yaml
broker:
image: rabbitmq:latest
profiles: ["local", "test"]
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
ports:
- "5672:5672"
networks:
- broker
Here we’re simply adding a new RabbitMQ container with the necessary environment variables and exposing port 5672, allowing our Producer and Consumer APIs to connect to it successfully.
Now we must implement another layer into our project, currently we only have the entities and repository (data layer), but here we’re adding some business logic. That said, we need to implement this inside a service
layer or a usecase
layer, that’s because executing business logic isn’t responsibility of the data layer. We’re also going to encapsulate the notification service inside a struct that implements an interface for maintenance purposes. Let’s starting by adding our new usecase.
Setting up our usecase and service layer
// internal/usecase/user_usecase.go
type UserUsecase interface {
FollowUser(followerId string, followedId string) error
}
type userUsecaseImpl struct {
repo repository.UserRepository
notificationService service.NotificationService
}
func NewUserUsecase(
repo repository.UserRepository,
notificationService service.NotificationService,
) UserUsecase {
return &userUsecaseImpl{
repo,
notificationService,
}
}
func (u *userUsecaseImpl) FollowUser(followerId string, followedId string) error {
err := u.repo.FollowUser(followerId, followedId)
if err != nil {
return err
}
body, err := json.Marshal(map[string]string{
"user_id": followedId,
"followed_by": followerId,
"message": "Hey, you have a new follower!",
})
if err != nil {
return fmt.Errorf("UserUsecase.FollowUser: error marshalling notification body - %w", err)
}
err = u.notificationService.Publish(body)
if err != nil {
return fmt.Errorf("UserUsecase.FollowUser: error sending notification - %w", err)
}
return nil
}
This is the smallest code we need for now, since we’re just implementing the FollowUser
functionality. This function first calls the repository to save the new follower in the database. It then creates a new JSON message with the necessary information and pushes it into the queue. After that we call a functions in the notificationTopic
, which is our queue name and passing this body created. This code may be strange, but let’s see its other part, the notification service.
// internal/service/notification_service.go
type NotificationService interface {
Publish(metadata []byte) error
}
type notificationServiceImpl struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
}
// This functions is a constructor for our notificationService,
// it returns a new service to produce messages.
func NewNotificationService(queue string, conn *amqp.Connection) (NotificationService, error) {
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("NotificationService.NewNotificationService: error creating channel - %w", err)
}
// This is declaring a new queue
q, err := ch.QueueDeclare(
queue, // Queue name
true, // Durable (persists even if RabbitMQ restarts)
false, // Auto-delete
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return nil, fmt.Errorf("NotificationService.NewNotificationService: error declaring queue - %w", err)
}
return ¬ificationServiceImpl{
conn,
ch,
q,
}, err
}
func (n *notificationServiceImpl) Publish(metadata []byte) error {
err := n.channel.Publish(
"",
n.queue.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: metadata,
},
)
if err != nil {
return fmt.Errorf("NotificationService.Publish: error publishing message - %w", err)
}
return nil
}
The code above represents a separate service that will publish the new message into our notification message queue, it consists in an interface with the Publish
method, that notificationService
implements it. We create it in this way to facilitate testing and maintainability.
Another small thing that we’re doing differently here, is that we are creating the queue just once in our constructor in this line:
q, err := ch.QueueDeclare(
queue, // Queue name
true, // Durable (persists even if RabbitMQ restarts)
false, // Auto-delete
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return nil, fmt.Errorf("NotificationService.NewNotificationService: error declaring queue - %w", err)
}
By doing this, we avoid declaring a new queue every new message published, removing overhead in this communication. We also save the new channel and the queue created to this service, so we can access them when we publish a new message.
Another good practice that we could implement here, it to wrap the connection and the channel into interfaces that holds the actual connections and channels. This would make them easier to test and mock, but for the sake of simplicity, i will not do this, if you want to see an example of how to do this, you can read this post where i explain it better.
Setting up APIs
Producer API
Now that our usecase and service is created and successfully integrated, we need to inject the repository and service inside it in our main file.
//cmd/server/main.go
package main
func main() {
dbConfig, err := config.NewDBConfig()
if err != nil {
log.Fatal(err)
}
db, err := db.New(dbConfig)
if err != nil {
log.Fatal(err)
}
if err := db.Ping(); err != nil {
log.Fatal(err)
}
broker, err := service.NewBrokerConnection()
if err != nil {
log.Fatal(err)
}
notificationService, err := service.NewNotificationService(notificationTopic, broker)
if err != nil {
log.Fatal(err)
}
userRepository := repository.NewUserRepository(db)
userUsecase := usecase.NewUserUsecase(userRepository, notificationService)
}
Here we create a new database connection and a broker connection, which is the connection with RabbitMQ, this piece of code is not in the main file, but it looks like this:
// internal/service/broker.go
func NewBrokerConnection() (*amqp.Connection, error) {
// BROKER_DSN -> amqp://guest:guest@broker:5672/
dsn := os.Getenv("BROKER_DSN")
conn, err := amqp.Dial(dsn)
if err != nil {
return nil, fmt.Errorf("NewBrokerConnection: error dialing connection - %w", err)
}
return conn, nil
}
We took the dsn from our environment and Dialed it to create a new connection, after that we return this connection, just like our database connection. After doing this, we simply inject this amqp connection inside our notification service and inject our notification service and repository inside our usecase.
Consumer API
We’ve set up our Producer API, which is who is going to insert the messages into the queue, now we need to set up our Consumer. For simplicity, I’ll create it in the same repository, but in the path cmd/notification/main.go
and just add another docker compose entry to run this file when we run docker compose up
. Although, the best practice is to leave these concepts separated in different modules to compound a bigger system. Here is our notification main file.
// cmd/notification/main.go
package notification
const notificationTopic = "notification"
func main() {
conn, err := service.NewBrokerConnection()
if err != nil {
log.Fatal(err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
q, err := ch.Consume(
notificationTopic,
"",
true,
false,
false,
false,
nil,
)
var forever chan struct{}
go func() {
for msg := range q {
log.Println("Received message")
// We can do anything with this received message,
// for simplicity, i just want to show you how to receive
// it in another service to process in a different place
err := processMessage(msg.Body)
if err != nil {
log.Println("Error processing message:", err)
}
}
}()
log.Println("Waiting for messages...")
<-forever
}
We are doing the same thing as the producer API main file, the only difference here is the ch.Consume
method, that is because we are now consuming from the queue with notification
name, but the logic to create a new connection and a new channel is the same. After we create a new channel and run a function with a go routine, that’s because we need to lock the main thread while we need to listen into messages from the queue. While this happens, our go routine function will run an infinite loop while reading new messages from our queue.
Adding Consumer API to compose.yaml
Now that we created our consumer main file, let’s add it into our docker compose file, just like the first API, but, since we don’t need a database connection, we can omit Postgres related environment variables and networks. It should look like this:
# compose.yaml
local:
build:
context: .
dockerfile: Dockerfile
networks:
- database
- broker
profiles: ["local", "test"]
environment:
- DB_HOST=database
- DB_PORT=5432
- DB_USER=postgres
- DB_PASSWORD=mysecretpassword
- DB_NAME=local_db
- BROKER_DSN=amqp://guest:guest@broker:5672/
depends_on:
- database
- broker
entrypoint: ["go", "run", "cmd/server/main.go"]
notification:
build:
context: .
dockerfile: Dockerfile
networks:
- broker
profiles: ["local"]
environment:
- BROKER_DSN=amqp://guest:guest@broker:5672/
depends_on:
- broker
entrypoint: ["go", "run", "cmd/notification/main.go"]
Since we need the same dependencies and files, both our services will build from the same docker image, but theirs entrypoint will be different. While the producer will have cmd/server/main.go
as entrypoint, our consumer will have cmd/notification/main.go
, because both of them needs to run and perform different functions inside our system, that is another reason to separate these two APIs into different repositories.
Creating handler layer
Now to test it manually, we will need to create another layer in our project, the handlers (or controllers) layer. This layer is the interface between the business logic and the client, it is responsible to receive the HTTP requests, parse it, send them to the usecase and then return some response (good or bad) to the client. Our handlers should look like this:
// internal/handlers/handlers.go
func CreateUser(uc usecase.UserUsecase) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var user models.User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
defer r.Body.Close()
if err := uc.Save(user); err != nil {
http.Error(w, "Failed to create user", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{"message": "User created successfully"})
}
}
func FollowUser(uc usecase.UserUsecase) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var req struct {
FollowerID string `json:"follower_id"`
FollowedID string `json:"followed_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
defer r.Body.Close()
if err := uc.FollowUser(req.FollowerID, req.FollowedID); err != nil {
http.Error(w, "Failed to follow user", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"message": "User followed successfully"})
}
}
We are creating a function that returns a HandlerFunc, which is a function with this specific annotation: func(w http.ResponseWriter, r *http.Request)
, because Go default mux needs a function with this annotation and we need to access the UserUsecase from the function closure, that’s why it is a parameter in our HandlerFunc.
After that, let’s add these new handlers into our main file:
//cmd/server/main.go
func main() {
// Previous code
userUsecase := usecase.NewUserUsecase(userRepository, notificationService)
http.HandleFunc("POST /users", handlers.CreateUser(userUsecase))
http.HandleFunc("POST /users/follow", handlers.FollowUser(userUsecase))
log.Fatal(http.ListenAndServe(":8080", nil))
}
Testing manually
Now that we’ve created our handlers, let’s test it using curl
, first you need to run a command to run our docker compose file, we can do this with:
docker compose --profile local up --build
After our containers are successfully running, just migrate the database with the migrations you can find here, and now we can create two new users and add a new follower using curl
command line tool:
# Creating new users
$ curl -X POST http://localhost:8080/users \
-H "Content-Type: application/json" \
-d '{
"name": "Alice Johnson",
"username": "alice_j"
}'
$ curl -X POST http://localhost:8080/users \
-H "Content-Type: application/json" \
-d '{
"name": "Bob Smith",
"username": "bob_s"
}'
# Following user
$ curl -X POST http://localhost:8080/users/follow \
-H 'Content-Type: application/json' \
-d '{
"followed_id": "user_1_id",
"follower_id": "user_2_id"
}'
If you check the logs for our consumer API container it should container something similar to this message:
notification-1 | 2025/03/11 13:40:31 Received message
notification-1 | 2025/03/11 13:40:31 {"followed_by":"30d4971f-0677-4b99-a03a-cd8f17c8d893","message":"Hey, you have a new follower!","user_id":"f15c6c35-76b8-4d93-8971-727d99df6b4d"}
This means that the insertion into the RabbitMQ queue worked successfully and we consumed this new message in our API.
Conclusion
In this article, we explored how RabbitMQ enables scalable and decoupled systems by facilitating asynchronous communication between services. We built a simple event-driven system using RabbitMQ and Go, covering the following topics:
- RabbitMQ’s core concepts, including producers, consumers, exchanges, and queues.
- How to set up RabbitMQ using Docker Compose.
- Implementing the producer API to publish messages.
- Implementing the consumer API to process messages asynchronously.
- Manually testing the system using
curl
commands.
If you want to challenge yourself more, try to make this system more reliable by implementing more features, like:
- Message acknowledgments & retries to prevent data loss.
- Dead-letter queues (DLQs) for handling failed messages.
- Fanout & topic exchanges to handle more complex routing scenarios.
- Monitoring RabbitMQ using tools like Prometheus and Grafana.