RabbitMQ #
Tidak semua kebutuhan messaging butuh platform sekompleks Kafka. Ketika yang kamu perlukan adalah routing pesan yang fleksibel, task queue yang andal, atau komunikasi request-reply antar service, RabbitMQ adalah pilihan yang jauh lebih natural. RabbitMQ mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) dan menggunakan model push — broker yang secara aktif mendorong pesan ke consumer, berbeda dengan Kafka yang menggunakan model pull. Hasilnya adalah latensi yang lebih rendah untuk skenario di mana setiap pesan perlu segera diproses dan kemudian dihapus. Dalam ekosistem Java, library resmi AMQP Client dari RabbitMQ tersedia lengkap dan mudah diintegrasikan.
Arsitektur RabbitMQ #
Model kerja RabbitMQ sangat berbeda dari Kafka. Penting untuk memahami perbedaan ini sebelum menulis kode, karena model ini menentukan bagaimana kamu mendesain sistem.
Komponen Utama #
RabbitMQ memiliki empat komponen inti yang saling berinteraksi:
flowchart LR
P[Producer] --> EX[Exchange]
EX -->|binding key| Q1[Queue A]
EX -->|binding key| Q2[Queue B]
Q1 --> C1[Consumer 1]
Q2 --> C2[Consumer 2]
Q2 --> C3[Consumer 3]Producer mengirim pesan ke exchange, bukan langsung ke queue. Producer tidak perlu tahu queue mana yang akan menerima pesannya.
Exchange adalah router. Ia menerima pesan dari producer dan memutuskan ke queue mana pesan diteruskan berdasarkan tipe exchange dan binding key.
Queue adalah tempat pesan menunggu sampai ada consumer yang mengambilnya. Berbeda dengan Kafka, pesan yang sudah di-consume dan di-acknowledge akan dihapus dari queue.
Binding adalah aturan yang menghubungkan exchange ke queue. Binding bisa menyertakan routing key yang digunakan exchange untuk memfilter pesan.
Virtual Host #
RabbitMQ mendukung virtual host (vhost) — namespace yang memisahkan exchange, queue, dan permission. Ini memungkinkan satu RabbitMQ instance melayani beberapa aplikasi yang terisolasi satu sama lain, mirip seperti database dalam satu server database.
RabbitMQ Instance
├── vhost: / ← default vhost
├── vhost: /ecommerce
│ ├── exchange: order-exchange
│ └── queue: order-queue
└── vhost: /payments
├── exchange: payment-exchange
└── queue: payment-queue
Setup Dependencies #
Tambahkan AMQP client ke project Java kamu. Untuk Maven:
<dependencies>
<!-- RabbitMQ Java AMQP Client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.21.0</version>
</dependency>
<!-- SLF4J untuk logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.12</version>
</dependency>
</dependencies>
Untuk Gradle:
dependencies {
implementation 'com.rabbitmq:amqp-client:5.21.0'
implementation 'org.slf4j:slf4j-simple:2.0.12'
}
Untuk menjalankan RabbitMQ secara lokal, Docker adalah cara tercepat:
# Jalankan RabbitMQ dengan Management UI di port 15672
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3.13-management
# Akses Management UI di: http://localhost:15672
# Username: admin, Password: admin
Koneksi dan Channel #
Sebelum mengirim atau menerima pesan, kamu perlu membuat koneksi ke RabbitMQ. RabbitMQ menggunakan konsep connection dan channel yang penting untuk dipahami.
Connection adalah koneksi TCP ke broker. Membuat connection itu mahal — libatkan TCP handshake dan negosiasi protokol. Dalam aplikasi, biasanya kamu punya satu connection per proses atau per thread pool.
Channel adalah “virtual connection” di dalam connection. Channel adalah unit kerja yang ringan. Operasi AMQP (publish, consume, declare) semua dilakukan melalui channel. Gunakan satu channel per thread untuk menghindari masalah concurrency.
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
public class RabbitMQConnection {
public static ConnectionFactory createFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// Connection recovery otomatis jika koneksi terputus
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000); // coba reconnect tiap 5 detik
// Heartbeat — deteksi koneksi mati lebih cepat
factory.setRequestedHeartbeat(30); // 30 detik
return factory;
}
public static void main(String[] args) throws Exception {
ConnectionFactory factory = createFactory();
// try-with-resources untuk menutup connection otomatis
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
System.out.println("Terhubung ke RabbitMQ!");
System.out.println("Channel number: " + channel.getChannelNumber());
}
}
}
Jangan buat connection baru untuk setiap pesan yang dikirim. Ini sangat tidak efisien dan bisa menyebabkan RabbitMQ kehabisan file descriptor. Buat satu connection dan gunakan channel yang berbeda untuk setiap thread.
Tipe Exchange #
Pilihan tipe exchange adalah keputusan desain paling penting di RabbitMQ. Setiap tipe memiliki semantik routing yang berbeda.
Direct Exchange #
Direct exchange mengirimkan pesan ke queue yang binding key-nya persis sama dengan routing key pesan. Ini adalah tipe paling sederhana — cocok untuk task queue dan routing berdasarkan jenis tugas.
flowchart LR
P[Producer] -->|routing_key=email| DE[Direct Exchange\norder-direct]
DE -->|binding: email| QE[Queue: email-queue]
DE -->|binding: sms| QS[Queue: sms-queue]
QE --> C1[Email Service]
QS --> C2[SMS Service]public class DirectExchangeDemo {
private static final String EXCHANGE_NAME = "notification-direct";
private static final String EMAIL_QUEUE = "email-queue";
private static final String SMS_QUEUE = "sms-queue";
public static void setup(Channel channel) throws Exception {
// Deklarasi exchange — idempoten, aman dipanggil berkali-kali
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // durable=true
// Deklarasi queue — durable agar bertahan setelah restart broker
channel.queueDeclare(EMAIL_QUEUE, true, false, false, null);
channel.queueDeclare(SMS_QUEUE, true, false, false, null);
// Binding: hubungkan queue ke exchange dengan routing key
channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "email");
channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "sms");
}
public static void publish(Channel channel, String type, String message) throws Exception {
// Pesan akan dikirim hanya ke queue yang binding key-nya "email" atau "sms"
channel.basicPublish(
EXCHANGE_NAME,
type, // routing key
null, // properties
message.getBytes()
);
System.out.println("Terkirim ke routing key '" + type + "': " + message);
}
}
Fanout Exchange #
Fanout exchange mengabaikan routing key dan mengirimkan pesan ke semua queue yang terikat padanya. Cocok untuk broadcast — satu event yang perlu diketahui banyak service.
flowchart LR
P[Producer] -->|routing_key diabaikan| FE[Fanout Exchange\norder-fanout]
FE --> Q1[Queue: analytics-queue]
FE --> Q2[Queue: notification-queue]
FE --> Q3[Queue: audit-queue]
Q1 --> C1[Analytics Service]
Q2 --> C2[Notification Service]
Q3 --> C3[Audit Service]public class FanoutExchangeDemo {
private static final String EXCHANGE_NAME = "order-events-fanout";
public static void setup(Channel channel) throws Exception {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
// Setiap consumer membuat queue-nya sendiri dengan nama unik
// exclusive=true — queue dihapus saat consumer disconnect
String analyticsQueue = channel.queueDeclare().getQueue();
String notifQueue = channel.queueDeclare().getQueue();
String auditQueue = channel.queueDeclare().getQueue();
// Binding ke fanout exchange — routing key diabaikan (bisa dikosongkan)
channel.queueBind(analyticsQueue, EXCHANGE_NAME, "");
channel.queueBind(notifQueue, EXCHANGE_NAME, "");
channel.queueBind(auditQueue, EXCHANGE_NAME, "");
}
public static void publish(Channel channel, String orderJson) throws Exception {
// Routing key kosong — fanout exchange mengabaikannya
channel.basicPublish(EXCHANGE_NAME, "", null, orderJson.getBytes());
}
}
Topic Exchange #
Topic exchange mencocokkan routing key dengan binding pattern menggunakan wildcard. Ini adalah tipe paling fleksibel dan paling sering digunakan di sistem produksi.
Aturan wildcard:
*— menggantikan tepat satu kata#— menggantikan nol atau lebih kata- Kata dipisahkan oleh titik (
.)
flowchart LR
P[Producer] --> TE[Topic Exchange\napp-events]
TE -->|order.*| Q1[Queue: order-all]
TE -->|order.created| Q2[Queue: order-created-only]
TE -->|#.error| Q3[Queue: all-errors]
P -->|order.created| TE
P -->|order.shipped| TE
P -->|payment.error| TEpublic class TopicExchangeDemo {
private static final String EXCHANGE_NAME = "app-events";
public static void setup(Channel channel) throws Exception {
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// Queue 1: semua event order (order.created, order.shipped, order.cancelled, ...)
channel.queueDeclare("order-all-queue", true, false, false, null);
channel.queueBind("order-all-queue", EXCHANGE_NAME, "order.*");
// Queue 2: hanya order created
channel.queueDeclare("order-created-queue", true, false, false, null);
channel.queueBind("order-created-queue", EXCHANGE_NAME, "order.created");
// Queue 3: semua error dari semua service (payment.error, order.error, ...)
channel.queueDeclare("all-errors-queue", true, false, false, null);
channel.queueBind("all-errors-queue", EXCHANGE_NAME, "#.error");
}
public static void publish(Channel channel, String routingKey, String message) throws Exception {
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("Published dengan routing key: " + routingKey);
}
// Contoh penggunaan:
// publish(channel, "order.created", "...") → masuk ke order-all-queue DAN order-created-queue
// publish(channel, "order.shipped", "...") → masuk ke order-all-queue saja
// publish(channel, "payment.error", "...") → masuk ke all-errors-queue saja
// publish(channel, "order.error", "...") → masuk ke order-all-queue DAN all-errors-queue
}
Perbandingan Tipe Exchange #
| Direct | Fanout | Topic | Headers | |
|---|---|---|---|---|
| Routing berdasarkan | Exact match | Broadcast ke semua | Pattern wildcard | Header pesan |
| Fleksibilitas | Rendah | Tidak ada routing | Tinggi | Sangat tinggi |
| Use case | Task queue, notifikasi per tipe | Event broadcast | Event dengan hierarki | Routing kompleks tanpa naming |
| Performa | Cepat | Paling cepat | Sedang | Paling lambat |
Producer dengan Message Properties #
Pesan RabbitMQ bisa menyertakan properties — metadata yang dikirim bersama payload tanpa harus dimasukkan ke dalam body pesan.
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
public class ProducerWithProperties {
public static void publishWithProperties(Channel channel, String queue, String body) throws Exception {
// ✗ ANTI-PATTERN: pesan tidak persistent — hilang jika broker restart
channel.basicPublish("", queue, null, body.getBytes());
// ✓ BENAR: pesan persistent dengan MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, body.getBytes());
// Untuk properti lebih lengkap, gunakan builder
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) // 2 = persistent
.priority(5) // 0-9, butuh queue dengan x-max-priority
.correlationId("req-123") // untuk request-reply pattern
.replyTo("reply-queue") // untuk request-reply pattern
.expiration("60000") // TTL pesan: 60 detik (ms sebagai string)
.messageId(java.util.UUID.randomUUID().toString())
.timestamp(new java.util.Date())
.headers(java.util.Map.of(
"source-service", "order-service",
"retry-count", 0
))
.build();
channel.basicPublish("order-exchange", "order.created", properties, body.getBytes());
}
}
deliveryMode=2(persistent) hanya benar-benar menjamin durability jika queue juga dideklarasikan sebagaidurable=true. Keduanya harus aktif bersamaan. Jika queue tidak durable, pesan persistent tetap akan hilang saat broker restart.
Consumer dan Acknowledgment #
Consumer di RabbitMQ berbasis push — broker mendorong pesan ke consumer secara aktif. Acknowledgment (ack) adalah mekanisme yang memberitahu broker bahwa pesan sudah berhasil diproses.
Manual Acknowledgment #
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.CancelCallback;
public class ManualAckConsumer {
public static void startConsuming(Channel channel, String queueName) throws Exception {
// ✗ ANTI-PATTERN: auto ack — pesan dianggap selesai begitu dikirim ke consumer
// Jika consumer crash setelah terima tapi sebelum proses selesai, pesan hilang
boolean autoAck = false; // selalu false untuk production
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
System.out.println("Memproses: " + message);
processMessage(message);
// ✓ Ack setelah proses berhasil — broker hapus pesan dari queue
// multiple=false → hanya ack pesan ini, bukan semua pesan sebelumnya
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
System.err.println("Gagal memproses: " + e.getMessage());
// Nack — beritahu broker bahwa pesan gagal diproses
// requeue=true → kembalikan ke queue (hati-hati: bisa infinite loop!)
// requeue=false → buang pesan (atau kirim ke DLX jika dikonfigurasi)
boolean requeue = isTransientError(e); // hanya requeue jika error sementara
channel.basicNack(deliveryTag, false, requeue);
}
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Consumer dibatalkan: " + consumerTag);
};
channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
}
private static void processMessage(String message) throws Exception {
// logika bisnis
}
private static boolean isTransientError(Exception e) {
// Contoh: error network adalah transient, error parsing JSON bukan
return e instanceof java.io.IOException;
}
}
Prefetch — Mengontrol Beban Consumer #
Prefetch count menentukan berapa banyak pesan yang boleh dikirim broker ke consumer sebelum consumer mengirimkan ack. Ini adalah pengaturan kritis untuk distribusi beban yang adil.
// ✗ ANTI-PATTERN: tanpa prefetch (default) — broker kirim semua pesan ke consumer pertama
// Consumer yang cepat dan lambat mendapat beban yang tidak seimbang
// channel.basicQos(0); // 0 = tidak ada limit
// ✓ BENAR: prefetch=1 — broker hanya kirim 1 pesan baru setelah consumer ack pesan sebelumnya
// Beban terdistribusi berdasarkan kecepatan masing-masing consumer
channel.basicQos(1);
// Untuk throughput lebih tinggi dengan beberapa consumer, naikkan prefetch
// tapi pertimbangkan memory usage
channel.basicQos(10); // kirim max 10 pesan sebelum butuh ack
sequenceDiagram
participant Broker
participant C1 as Consumer 1 (lambat)
participant C2 as Consumer 2 (cepat)
Note over Broker,C2: TANPA prefetch — tidak adil
Broker->>C1: msg 1, 2, 3, 4, 5 (semua sekaligus)
Broker->>C2: msg 6, 7, 8, 9, 10
Note over Broker,C2: DENGAN prefetch=1 — adil
Broker->>C1: msg 1
Broker->>C2: msg 2
C2-->>Broker: ack msg 2
Broker->>C2: msg 3
C2-->>Broker: ack msg 3
Broker->>C2: msg 4
C1-->>Broker: ack msg 1
Broker->>C1: msg 5Dead Letter Exchange (DLX) #
Dead Letter Exchange adalah mekanisme resmi RabbitMQ untuk menangani pesan yang gagal diproses. Pesan menjadi “dead letter” ketika:
- Di-nack dengan
requeue=false - TTL pesan habis
- Queue penuh (jika ada
x-max-length)
public class DLXSetup {
public static void setupWithDLX(Channel channel) throws Exception {
// 1. Buat Dead Letter Exchange
String dlxExchange = "dlx.order-events";
String dlxQueue = "dlq.order-events";
channel.exchangeDeclare(dlxExchange, "direct", true);
channel.queueDeclare(dlxQueue, true, false, false, null);
channel.queueBind(dlxQueue, dlxExchange, "order-events"); // routing key harus cocok
// 2. Buat queue utama dengan referensi ke DLX
java.util.Map<String, Object> args = new java.util.HashMap<>();
args.put("x-dead-letter-exchange", dlxExchange); // arahkan pesan mati ke DLX
args.put("x-dead-letter-routing-key", "order-events"); // routing key di DLX
args.put("x-message-ttl", 300_000); // TTL 5 menit (opsional)
channel.queueDeclare("order-events-queue", true, false, false, args);
channel.queueBind("order-events-queue", "order-exchange", "order.*");
}
}
flowchart TD
P[Producer] --> EX[order-exchange]
EX --> Q[order-events-queue\nx-dead-letter-exchange: dlx]
Q --> C{Consumer\nberhasil proses?}
C -- Ya\nbasicAck --> DONE[Pesan dihapus]
C -- Tidak\nbasicNack requeue=false --> DLX[dlx.order-events]
DLX --> DLQ[dlq.order-events]
DLQ --> OPS[Tim Ops / Monitoring]Consumer untuk DLQ biasanya dijalankan terpisah untuk investigasi dan reprocessing manual:
public class DLQConsumer {
public static void monitorDLQ(Channel channel) throws Exception {
channel.basicQos(1);
channel.basicConsume("dlq.order-events", false,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// Log detail kegagalan
var headers = delivery.getProperties().getHeaders();
System.out.printf(
"[DLQ] Pesan gagal — reason: %s, message: %s%n",
headers != null ? headers.get("x-death") : "unknown",
message
);
// Kirim alert ke sistem monitoring (Slack, PagerDuty, dll)
sendAlert(message, headers);
// Ack di DLQ agar tidak diproses ulang tanpa disengaja
channel.basicAck(deliveryTag, false);
},
consumerTag -> {}
);
}
private static void sendAlert(String message, java.util.Map<String, Object> headers) {
// implementasi notifikasi ke tim ops
System.out.println("ALERT: pesan masuk ke DLQ, perlu investigasi manual.");
}
}
Pola Request-Reply (RPC over RabbitMQ) #
RabbitMQ sangat cocok untuk implementasi pola request-reply — caller mengirim request dan menunggu response. Ini yang membedakan RabbitMQ dari Kafka: Kafka tidak dirancang untuk pola ini.
sequenceDiagram
participant Client
participant RabbitMQ
participant Server
Client->>RabbitMQ: publish ke "rpc-queue"\ncorrelationId=uuid-123\nreplyTo="amq.rabbitmq.reply-to"
RabbitMQ->>Server: deliver request
Server->>Server: proses request
Server->>RabbitMQ: publish response ke replyTo\ncorrelationId=uuid-123
RabbitMQ->>Client: deliver response
Client->>Client: cocokkan correlationId → selesaiimport java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class RPCClient {
private final Channel channel;
public RPCClient(Channel channel) throws Exception {
this.channel = channel;
}
public String call(String request, int timeoutSeconds) throws Exception {
String correlationId = UUID.randomUUID().toString();
// BlockingQueue untuk menunggu response dari consumer callback (thread berbeda)
BlockingQueue<String> responseQueue = new ArrayBlockingQueue<>(1);
// "amq.rabbitmq.reply-to" adalah pseudo-queue bawaan RabbitMQ untuk Direct Reply-to
// Lebih efisien dari membuat temporary queue baru setiap request
String replyTo = "amq.rabbitmq.reply-to";
// Subscribe ke reply queue sebelum publish request
channel.basicConsume(replyTo, true, // auto ack untuk reply queue
(consumerTag, delivery) -> {
if (correlationId.equals(delivery.getProperties().getCorrelationId())) {
responseQueue.offer(new String(delivery.getBody(), "UTF-8"));
}
},
consumerTag -> {}
);
// Publish request
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.replyTo(replyTo)
.build();
channel.basicPublish("", "rpc-queue", props, request.getBytes());
// Tunggu response dengan timeout
String response = responseQueue.poll(timeoutSeconds, TimeUnit.SECONDS);
if (response == null) {
throw new RuntimeException("RPC timeout setelah " + timeoutSeconds + " detik");
}
return response;
}
}
public class RPCServer {
public static void start(Channel channel) throws Exception {
channel.queueDeclare("rpc-queue", false, false, false, null);
channel.basicQos(1); // proses satu request dalam satu waktu
System.out.println("RPC Server menunggu request...");
channel.basicConsume("rpc-queue", false,
(consumerTag, delivery) -> {
String request = new String(delivery.getBody(), "UTF-8");
String correlationId = delivery.getProperties().getCorrelationId();
String replyTo = delivery.getProperties().getReplyTo();
String response;
try {
response = handleRequest(request);
} catch (Exception e) {
response = "{\"error\": \"" + e.getMessage() + "\"}";
}
// Kirim response kembali ke reply queue
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.build();
channel.basicPublish("", replyTo, replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> {}
);
}
private static String handleRequest(String request) {
// logika bisnis — proses request dan kembalikan response
return "{\"status\": \"ok\", \"result\": \"processed: " + request + "\"}";
}
}
Retry dengan Exponential Backoff #
Ketika pesan gagal diproses karena error sementara (database sedang down, service lain tidak responsif), kamu butuh retry dengan delay — bukan langsung requeue yang akan menciptakan busy loop.
RabbitMQ tidak punya built-in delay, tapi kamu bisa mensimulasikannya dengan TTL queue:
public class RetryWithBackoff {
// Setup: buat hierarchy queue untuk delay
public static void setupRetryQueues(Channel channel) throws Exception {
String mainExchange = "order-exchange";
String mainQueue = "order-queue";
String retryExchange = "retry-exchange";
// Exchange utama
channel.exchangeDeclare(mainExchange, "direct", true);
// Queue retry dengan TTL — pesan akan "mati" kembali ke main queue setelah delay
String[] retryQueues = {"retry-5s", "retry-30s", "retry-5m"};
int[] delays = {5_000, 30_000, 300_000};
for (int i = 0; i < retryQueues.length; i++) {
java.util.Map<String, Object> args = new java.util.HashMap<>();
args.put("x-message-ttl", delays[i]);
args.put("x-dead-letter-exchange", mainExchange); // kembalikan ke main setelah TTL
args.put("x-dead-letter-routing-key", "order");
channel.queueDeclare(retryQueues[i], true, false, false, args);
channel.queueBind(retryQueues[i], retryExchange, retryQueues[i]);
}
// DLQ untuk pesan yang sudah habis retry
channel.queueDeclare("dlq.order-queue", true, false, false, null);
// Main queue dengan DLX
java.util.Map<String, Object> mainArgs = new java.util.HashMap<>();
mainArgs.put("x-dead-letter-exchange", retryExchange);
channel.queueDeclare(mainQueue, true, false, false, mainArgs);
channel.queueBind(mainQueue, mainExchange, "order");
}
// Consumer yang menentukan ke retry queue mana pesan harus dikirim
public static void consumeWithRetry(Channel channel) throws Exception {
channel.basicQos(1);
channel.basicConsume("order-queue", false,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// Hitung berapa kali pesan ini sudah di-retry
var headers = delivery.getProperties().getHeaders();
int retryCount = getRetryCount(headers);
try {
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
if (retryCount >= 3) {
// Sudah retry 3x — kirim ke DLQ
forwardToDLQ(channel, delivery, message, e.getMessage());
channel.basicAck(deliveryTag, false); // ack agar tidak diproses lagi
} else {
// Nack tanpa requeue → masuk ke DLX → retry queue
channel.basicNack(deliveryTag, false, false);
}
}
},
consumerTag -> {}
);
}
private static int getRetryCount(java.util.Map<String, Object> headers) {
if (headers == null) return 0;
var xDeath = headers.get("x-death");
if (xDeath == null) return 0;
// x-death berisi list of death record
@SuppressWarnings("unchecked")
var deathList = (java.util.List<?>) xDeath;
return deathList.size();
}
private static void processMessage(String message) throws Exception {
System.out.println("Memproses: " + message);
}
private static void forwardToDLQ(Channel channel, com.rabbitmq.client.Delivery delivery,
String message, String reason) throws Exception {
System.err.println("Pesan masuk DLQ setelah 3 retry: " + reason);
channel.basicPublish("", "dlq.order-queue",
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
}
Kapan Menggunakan RabbitMQ dan Kapan Tidak #
GUNAKAN RABBITMQ JIKA:
✓ Butuh routing fleksibel berdasarkan content (topic exchange)
✓ Task queue — pekerjaan yang harus dieksekusi tepat sekali
✓ Request-reply / RPC pattern
✓ Volume moderat dengan kompleksitas routing tinggi
✓ Butuh priority queue (x-max-priority)
✓ Pesan harus dihapus setelah diproses (tidak butuh replay)
✓ Integrasi dengan protokol selain AMQP (STOMP, MQTT via plugin)
PERTIMBANGKAN ALTERNATIF JIKA:
✗ Volume sangat tinggi (jutaan pesan/detik) → Kafka lebih tepat
✗ Butuh replay pesan yang sudah lama → Kafka punya retention log
✗ Banyak consumer independen membaca data yang sama → Kafka consumer group
✗ Stream processing dan agregasi real-time → Kafka Streams
✗ Audit log yang tidak boleh dihapus → Kafka dengan retention panjang
flowchart TD
A{Butuh routing\nfleksibel?} -- Ya --> B{Task dieksekusi\ntepat sekali?}
A -- Tidak --> C{Volume sangat\ntinggi?}
B -- Ya --> RABBIT[RabbitMQ]
B -- Tidak --> D{Butuh\nrequest-reply?}
D -- Ya --> RABBIT
D -- Tidak --> C
C -- Ya --> KAFKA[Kafka]
C -- Tidak --> E{Butuh\nreplay?}
E -- Ya --> KAFKA
E -- Tidak --> RABBITRingkasan #
- Exchange adalah router — producer tidak mengirim langsung ke queue. Pilih tipe exchange yang tepat:
directuntuk exact match,fanoutuntuk broadcast,topicuntuk routing berbasis pattern.- Selalu gunakan
durable=trueuntuk queue dan exchange di production, dandeliveryMode=2(persistent) untuk pesan yang tidak boleh hilang saat broker restart.- Matikan auto ack dan lakukan
basicAcksecara manual setelah pemrosesan berhasil — ini mencegah pesan hilang jika consumer crash di tengah proses.- Atur prefetch count (
basicQos) untuk distribusi beban yang adil antar consumer. Tanpa prefetch, consumer yang lambat akan mendapat tumpukan pesan yang belum bisa diproses.- Dead Letter Exchange (DLX) adalah cara resmi RabbitMQ untuk menangani pesan gagal — konfigurasi di level queue agar pesan yang di-nack otomatis masuk ke DLQ.
- Pattern request-reply adalah keunggulan RabbitMQ dibanding Kafka — gunakan
correlationIddanreplyTo(amq.rabbitmq.reply-to) untuk komunikasi sinkron di atas messaging layer.- Retry dengan exponential backoff bisa diimplementasikan menggunakan TTL queue yang mengembalikan pesan ke main queue setelah delay tertentu.
- RabbitMQ paling cocok untuk task queue, routing kompleks, dan request-reply — bukan untuk event streaming volume tinggi atau skenario yang butuh replay pesan.