RabbitMQ

RabbitMQ #

Tidak semua kebutuhan messaging butuh platform sekompleks Kafka. Ketika yang kamu perlukan adalah routing pesan yang fleksibel, task queue yang andal, atau komunikasi request-reply antar service, RabbitMQ adalah pilihan yang jauh lebih natural. RabbitMQ mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) dan menggunakan model push — broker yang secara aktif mendorong pesan ke consumer, berbeda dengan Kafka yang menggunakan model pull. Hasilnya adalah latensi yang lebih rendah untuk skenario di mana setiap pesan perlu segera diproses dan kemudian dihapus. Dalam ekosistem Java, library resmi AMQP Client dari RabbitMQ tersedia lengkap dan mudah diintegrasikan.

Arsitektur RabbitMQ #

Model kerja RabbitMQ sangat berbeda dari Kafka. Penting untuk memahami perbedaan ini sebelum menulis kode, karena model ini menentukan bagaimana kamu mendesain sistem.

Komponen Utama #

RabbitMQ memiliki empat komponen inti yang saling berinteraksi:

flowchart LR
    P[Producer] --> EX[Exchange]
    EX -->|binding key| Q1[Queue A]
    EX -->|binding key| Q2[Queue B]
    Q1 --> C1[Consumer 1]
    Q2 --> C2[Consumer 2]
    Q2 --> C3[Consumer 3]

Producer mengirim pesan ke exchange, bukan langsung ke queue. Producer tidak perlu tahu queue mana yang akan menerima pesannya.

Exchange adalah router. Ia menerima pesan dari producer dan memutuskan ke queue mana pesan diteruskan berdasarkan tipe exchange dan binding key.

Queue adalah tempat pesan menunggu sampai ada consumer yang mengambilnya. Berbeda dengan Kafka, pesan yang sudah di-consume dan di-acknowledge akan dihapus dari queue.

Binding adalah aturan yang menghubungkan exchange ke queue. Binding bisa menyertakan routing key yang digunakan exchange untuk memfilter pesan.

Virtual Host #

RabbitMQ mendukung virtual host (vhost) — namespace yang memisahkan exchange, queue, dan permission. Ini memungkinkan satu RabbitMQ instance melayani beberapa aplikasi yang terisolasi satu sama lain, mirip seperti database dalam satu server database.

RabbitMQ Instance
├── vhost: /              ← default vhost
├── vhost: /ecommerce
│   ├── exchange: order-exchange
│   └── queue: order-queue
└── vhost: /payments
    ├── exchange: payment-exchange
    └── queue: payment-queue

Setup Dependencies #

Tambahkan AMQP client ke project Java kamu. Untuk Maven:

<dependencies>
    <!-- RabbitMQ Java AMQP Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.21.0</version>
    </dependency>

    <!-- SLF4J untuk logging -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.12</version>
    </dependency>
</dependencies>

Untuk Gradle:

dependencies {
    implementation 'com.rabbitmq:amqp-client:5.21.0'
    implementation 'org.slf4j:slf4j-simple:2.0.12'
}

Untuk menjalankan RabbitMQ secara lokal, Docker adalah cara tercepat:

# Jalankan RabbitMQ dengan Management UI di port 15672
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3.13-management

# Akses Management UI di: http://localhost:15672
# Username: admin, Password: admin

Koneksi dan Channel #

Sebelum mengirim atau menerima pesan, kamu perlu membuat koneksi ke RabbitMQ. RabbitMQ menggunakan konsep connection dan channel yang penting untuk dipahami.

Connection adalah koneksi TCP ke broker. Membuat connection itu mahal — libatkan TCP handshake dan negosiasi protokol. Dalam aplikasi, biasanya kamu punya satu connection per proses atau per thread pool.

Channel adalah “virtual connection” di dalam connection. Channel adalah unit kerja yang ringan. Operasi AMQP (publish, consume, declare) semua dilakukan melalui channel. Gunakan satu channel per thread untuk menghindari masalah concurrency.

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;

public class RabbitMQConnection {

    public static ConnectionFactory createFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        // Connection recovery otomatis jika koneksi terputus
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000); // coba reconnect tiap 5 detik

        // Heartbeat — deteksi koneksi mati lebih cepat
        factory.setRequestedHeartbeat(30); // 30 detik

        return factory;
    }

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = createFactory();

        // try-with-resources untuk menutup connection otomatis
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            System.out.println("Terhubung ke RabbitMQ!");
            System.out.println("Channel number: " + channel.getChannelNumber());
        }
    }
}
Jangan buat connection baru untuk setiap pesan yang dikirim. Ini sangat tidak efisien dan bisa menyebabkan RabbitMQ kehabisan file descriptor. Buat satu connection dan gunakan channel yang berbeda untuk setiap thread.

Tipe Exchange #

Pilihan tipe exchange adalah keputusan desain paling penting di RabbitMQ. Setiap tipe memiliki semantik routing yang berbeda.

Direct Exchange #

Direct exchange mengirimkan pesan ke queue yang binding key-nya persis sama dengan routing key pesan. Ini adalah tipe paling sederhana — cocok untuk task queue dan routing berdasarkan jenis tugas.

flowchart LR
    P[Producer] -->|routing_key=email| DE[Direct Exchange\norder-direct]
    DE -->|binding: email| QE[Queue: email-queue]
    DE -->|binding: sms| QS[Queue: sms-queue]
    QE --> C1[Email Service]
    QS --> C2[SMS Service]
public class DirectExchangeDemo {

    private static final String EXCHANGE_NAME = "notification-direct";
    private static final String EMAIL_QUEUE = "email-queue";
    private static final String SMS_QUEUE = "sms-queue";

    public static void setup(Channel channel) throws Exception {
        // Deklarasi exchange — idempoten, aman dipanggil berkali-kali
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // durable=true

        // Deklarasi queue — durable agar bertahan setelah restart broker
        channel.queueDeclare(EMAIL_QUEUE, true, false, false, null);
        channel.queueDeclare(SMS_QUEUE, true, false, false, null);

        // Binding: hubungkan queue ke exchange dengan routing key
        channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "email");
        channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "sms");
    }

    public static void publish(Channel channel, String type, String message) throws Exception {
        // Pesan akan dikirim hanya ke queue yang binding key-nya "email" atau "sms"
        channel.basicPublish(
            EXCHANGE_NAME,
            type,           // routing key
            null,           // properties
            message.getBytes()
        );
        System.out.println("Terkirim ke routing key '" + type + "': " + message);
    }
}

Fanout Exchange #

Fanout exchange mengabaikan routing key dan mengirimkan pesan ke semua queue yang terikat padanya. Cocok untuk broadcast — satu event yang perlu diketahui banyak service.

flowchart LR
    P[Producer] -->|routing_key diabaikan| FE[Fanout Exchange\norder-fanout]
    FE --> Q1[Queue: analytics-queue]
    FE --> Q2[Queue: notification-queue]
    FE --> Q3[Queue: audit-queue]
    Q1 --> C1[Analytics Service]
    Q2 --> C2[Notification Service]
    Q3 --> C3[Audit Service]
public class FanoutExchangeDemo {

    private static final String EXCHANGE_NAME = "order-events-fanout";

    public static void setup(Channel channel) throws Exception {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);

        // Setiap consumer membuat queue-nya sendiri dengan nama unik
        // exclusive=true — queue dihapus saat consumer disconnect
        String analyticsQueue = channel.queueDeclare().getQueue();
        String notifQueue = channel.queueDeclare().getQueue();
        String auditQueue = channel.queueDeclare().getQueue();

        // Binding ke fanout exchange — routing key diabaikan (bisa dikosongkan)
        channel.queueBind(analyticsQueue, EXCHANGE_NAME, "");
        channel.queueBind(notifQueue, EXCHANGE_NAME, "");
        channel.queueBind(auditQueue, EXCHANGE_NAME, "");
    }

    public static void publish(Channel channel, String orderJson) throws Exception {
        // Routing key kosong — fanout exchange mengabaikannya
        channel.basicPublish(EXCHANGE_NAME, "", null, orderJson.getBytes());
    }
}

Topic Exchange #

Topic exchange mencocokkan routing key dengan binding pattern menggunakan wildcard. Ini adalah tipe paling fleksibel dan paling sering digunakan di sistem produksi.

Aturan wildcard:

  • * — menggantikan tepat satu kata
  • # — menggantikan nol atau lebih kata
  • Kata dipisahkan oleh titik (.)
flowchart LR
    P[Producer] --> TE[Topic Exchange\napp-events]
    TE -->|order.*| Q1[Queue: order-all]
    TE -->|order.created| Q2[Queue: order-created-only]
    TE -->|#.error| Q3[Queue: all-errors]

    P -->|order.created| TE
    P -->|order.shipped| TE
    P -->|payment.error| TE
public class TopicExchangeDemo {

    private static final String EXCHANGE_NAME = "app-events";

    public static void setup(Channel channel) throws Exception {
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

        // Queue 1: semua event order (order.created, order.shipped, order.cancelled, ...)
        channel.queueDeclare("order-all-queue", true, false, false, null);
        channel.queueBind("order-all-queue", EXCHANGE_NAME, "order.*");

        // Queue 2: hanya order created
        channel.queueDeclare("order-created-queue", true, false, false, null);
        channel.queueBind("order-created-queue", EXCHANGE_NAME, "order.created");

        // Queue 3: semua error dari semua service (payment.error, order.error, ...)
        channel.queueDeclare("all-errors-queue", true, false, false, null);
        channel.queueBind("all-errors-queue", EXCHANGE_NAME, "#.error");
    }

    public static void publish(Channel channel, String routingKey, String message) throws Exception {
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("Published dengan routing key: " + routingKey);
    }

    // Contoh penggunaan:
    // publish(channel, "order.created", "...")   → masuk ke order-all-queue DAN order-created-queue
    // publish(channel, "order.shipped", "...")   → masuk ke order-all-queue saja
    // publish(channel, "payment.error", "...")   → masuk ke all-errors-queue saja
    // publish(channel, "order.error", "...")     → masuk ke order-all-queue DAN all-errors-queue
}

Perbandingan Tipe Exchange #

DirectFanoutTopicHeaders
Routing berdasarkanExact matchBroadcast ke semuaPattern wildcardHeader pesan
FleksibilitasRendahTidak ada routingTinggiSangat tinggi
Use caseTask queue, notifikasi per tipeEvent broadcastEvent dengan hierarkiRouting kompleks tanpa naming
PerformaCepatPaling cepatSedangPaling lambat

Producer dengan Message Properties #

Pesan RabbitMQ bisa menyertakan properties — metadata yang dikirim bersama payload tanpa harus dimasukkan ke dalam body pesan.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;

public class ProducerWithProperties {

    public static void publishWithProperties(Channel channel, String queue, String body) throws Exception {
        // ✗ ANTI-PATTERN: pesan tidak persistent — hilang jika broker restart
        channel.basicPublish("", queue, null, body.getBytes());

        // ✓ BENAR: pesan persistent dengan MessageProperties.PERSISTENT_TEXT_PLAIN
        channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, body.getBytes());

        // Untuk properti lebih lengkap, gunakan builder
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .deliveryMode(2)                    // 2 = persistent
            .priority(5)                        // 0-9, butuh queue dengan x-max-priority
            .correlationId("req-123")           // untuk request-reply pattern
            .replyTo("reply-queue")             // untuk request-reply pattern
            .expiration("60000")                // TTL pesan: 60 detik (ms sebagai string)
            .messageId(java.util.UUID.randomUUID().toString())
            .timestamp(new java.util.Date())
            .headers(java.util.Map.of(
                "source-service", "order-service",
                "retry-count", 0
            ))
            .build();

        channel.basicPublish("order-exchange", "order.created", properties, body.getBytes());
    }
}
deliveryMode=2 (persistent) hanya benar-benar menjamin durability jika queue juga dideklarasikan sebagai durable=true. Keduanya harus aktif bersamaan. Jika queue tidak durable, pesan persistent tetap akan hilang saat broker restart.

Consumer dan Acknowledgment #

Consumer di RabbitMQ berbasis push — broker mendorong pesan ke consumer secara aktif. Acknowledgment (ack) adalah mekanisme yang memberitahu broker bahwa pesan sudah berhasil diproses.

Manual Acknowledgment #

import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.CancelCallback;

public class ManualAckConsumer {

    public static void startConsuming(Channel channel, String queueName) throws Exception {

        // ✗ ANTI-PATTERN: auto ack — pesan dianggap selesai begitu dikirim ke consumer
        // Jika consumer crash setelah terima tapi sebelum proses selesai, pesan hilang
        boolean autoAck = false; // selalu false untuk production

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();

            try {
                System.out.println("Memproses: " + message);
                processMessage(message);

                // ✓ Ack setelah proses berhasil — broker hapus pesan dari queue
                // multiple=false → hanya ack pesan ini, bukan semua pesan sebelumnya
                channel.basicAck(deliveryTag, false);

            } catch (Exception e) {
                System.err.println("Gagal memproses: " + e.getMessage());

                // Nack — beritahu broker bahwa pesan gagal diproses
                // requeue=true  → kembalikan ke queue (hati-hati: bisa infinite loop!)
                // requeue=false → buang pesan (atau kirim ke DLX jika dikonfigurasi)
                boolean requeue = isTransientError(e); // hanya requeue jika error sementara
                channel.basicNack(deliveryTag, false, requeue);
            }
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Consumer dibatalkan: " + consumerTag);
        };

        channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
    }

    private static void processMessage(String message) throws Exception {
        // logika bisnis
    }

    private static boolean isTransientError(Exception e) {
        // Contoh: error network adalah transient, error parsing JSON bukan
        return e instanceof java.io.IOException;
    }
}

Prefetch — Mengontrol Beban Consumer #

Prefetch count menentukan berapa banyak pesan yang boleh dikirim broker ke consumer sebelum consumer mengirimkan ack. Ini adalah pengaturan kritis untuk distribusi beban yang adil.

// ✗ ANTI-PATTERN: tanpa prefetch (default) — broker kirim semua pesan ke consumer pertama
// Consumer yang cepat dan lambat mendapat beban yang tidak seimbang
// channel.basicQos(0); // 0 = tidak ada limit

// ✓ BENAR: prefetch=1 — broker hanya kirim 1 pesan baru setelah consumer ack pesan sebelumnya
// Beban terdistribusi berdasarkan kecepatan masing-masing consumer
channel.basicQos(1);

// Untuk throughput lebih tinggi dengan beberapa consumer, naikkan prefetch
// tapi pertimbangkan memory usage
channel.basicQos(10); // kirim max 10 pesan sebelum butuh ack
sequenceDiagram
    participant Broker
    participant C1 as Consumer 1 (lambat)
    participant C2 as Consumer 2 (cepat)

    Note over Broker,C2: TANPA prefetch — tidak adil
    Broker->>C1: msg 1, 2, 3, 4, 5 (semua sekaligus)
    Broker->>C2: msg 6, 7, 8, 9, 10

    Note over Broker,C2: DENGAN prefetch=1 — adil
    Broker->>C1: msg 1
    Broker->>C2: msg 2
    C2-->>Broker: ack msg 2
    Broker->>C2: msg 3
    C2-->>Broker: ack msg 3
    Broker->>C2: msg 4
    C1-->>Broker: ack msg 1
    Broker->>C1: msg 5

Dead Letter Exchange (DLX) #

Dead Letter Exchange adalah mekanisme resmi RabbitMQ untuk menangani pesan yang gagal diproses. Pesan menjadi “dead letter” ketika:

  • Di-nack dengan requeue=false
  • TTL pesan habis
  • Queue penuh (jika ada x-max-length)
public class DLXSetup {

    public static void setupWithDLX(Channel channel) throws Exception {
        // 1. Buat Dead Letter Exchange
        String dlxExchange = "dlx.order-events";
        String dlxQueue = "dlq.order-events";

        channel.exchangeDeclare(dlxExchange, "direct", true);
        channel.queueDeclare(dlxQueue, true, false, false, null);
        channel.queueBind(dlxQueue, dlxExchange, "order-events"); // routing key harus cocok

        // 2. Buat queue utama dengan referensi ke DLX
        java.util.Map<String, Object> args = new java.util.HashMap<>();
        args.put("x-dead-letter-exchange", dlxExchange);      // arahkan pesan mati ke DLX
        args.put("x-dead-letter-routing-key", "order-events"); // routing key di DLX
        args.put("x-message-ttl", 300_000);                   // TTL 5 menit (opsional)

        channel.queueDeclare("order-events-queue", true, false, false, args);
        channel.queueBind("order-events-queue", "order-exchange", "order.*");
    }
}
flowchart TD
    P[Producer] --> EX[order-exchange]
    EX --> Q[order-events-queue\nx-dead-letter-exchange: dlx]
    Q --> C{Consumer\nberhasil proses?}
    C -- Ya\nbasicAck --> DONE[Pesan dihapus]
    C -- Tidak\nbasicNack requeue=false --> DLX[dlx.order-events]
    DLX --> DLQ[dlq.order-events]
    DLQ --> OPS[Tim Ops / Monitoring]

Consumer untuk DLQ biasanya dijalankan terpisah untuk investigasi dan reprocessing manual:

public class DLQConsumer {

    public static void monitorDLQ(Channel channel) throws Exception {
        channel.basicQos(1);

        channel.basicConsume("dlq.order-events", false,
            (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();

                // Log detail kegagalan
                var headers = delivery.getProperties().getHeaders();
                System.out.printf(
                    "[DLQ] Pesan gagal — reason: %s, message: %s%n",
                    headers != null ? headers.get("x-death") : "unknown",
                    message
                );

                // Kirim alert ke sistem monitoring (Slack, PagerDuty, dll)
                sendAlert(message, headers);

                // Ack di DLQ agar tidak diproses ulang tanpa disengaja
                channel.basicAck(deliveryTag, false);
            },
            consumerTag -> {}
        );
    }

    private static void sendAlert(String message, java.util.Map<String, Object> headers) {
        // implementasi notifikasi ke tim ops
        System.out.println("ALERT: pesan masuk ke DLQ, perlu investigasi manual.");
    }
}

Pola Request-Reply (RPC over RabbitMQ) #

RabbitMQ sangat cocok untuk implementasi pola request-reply — caller mengirim request dan menunggu response. Ini yang membedakan RabbitMQ dari Kafka: Kafka tidak dirancang untuk pola ini.

sequenceDiagram
    participant Client
    participant RabbitMQ
    participant Server

    Client->>RabbitMQ: publish ke "rpc-queue"\ncorrelationId=uuid-123\nreplyTo="amq.rabbitmq.reply-to"
    RabbitMQ->>Server: deliver request
    Server->>Server: proses request
    Server->>RabbitMQ: publish response ke replyTo\ncorrelationId=uuid-123
    RabbitMQ->>Client: deliver response
    Client->>Client: cocokkan correlationId → selesai
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class RPCClient {

    private final Channel channel;

    public RPCClient(Channel channel) throws Exception {
        this.channel = channel;
    }

    public String call(String request, int timeoutSeconds) throws Exception {
        String correlationId = UUID.randomUUID().toString();

        // BlockingQueue untuk menunggu response dari consumer callback (thread berbeda)
        BlockingQueue<String> responseQueue = new ArrayBlockingQueue<>(1);

        // "amq.rabbitmq.reply-to" adalah pseudo-queue bawaan RabbitMQ untuk Direct Reply-to
        // Lebih efisien dari membuat temporary queue baru setiap request
        String replyTo = "amq.rabbitmq.reply-to";

        // Subscribe ke reply queue sebelum publish request
        channel.basicConsume(replyTo, true, // auto ack untuk reply queue
            (consumerTag, delivery) -> {
                if (correlationId.equals(delivery.getProperties().getCorrelationId())) {
                    responseQueue.offer(new String(delivery.getBody(), "UTF-8"));
                }
            },
            consumerTag -> {}
        );

        // Publish request
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .correlationId(correlationId)
            .replyTo(replyTo)
            .build();

        channel.basicPublish("", "rpc-queue", props, request.getBytes());

        // Tunggu response dengan timeout
        String response = responseQueue.poll(timeoutSeconds, TimeUnit.SECONDS);
        if (response == null) {
            throw new RuntimeException("RPC timeout setelah " + timeoutSeconds + " detik");
        }
        return response;
    }
}

public class RPCServer {

    public static void start(Channel channel) throws Exception {
        channel.queueDeclare("rpc-queue", false, false, false, null);
        channel.basicQos(1); // proses satu request dalam satu waktu

        System.out.println("RPC Server menunggu request...");

        channel.basicConsume("rpc-queue", false,
            (consumerTag, delivery) -> {
                String request = new String(delivery.getBody(), "UTF-8");
                String correlationId = delivery.getProperties().getCorrelationId();
                String replyTo = delivery.getProperties().getReplyTo();

                String response;
                try {
                    response = handleRequest(request);
                } catch (Exception e) {
                    response = "{\"error\": \"" + e.getMessage() + "\"}";
                }

                // Kirim response kembali ke reply queue
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationId)
                    .build();

                channel.basicPublish("", replyTo, replyProps, response.getBytes());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            },
            consumerTag -> {}
        );
    }

    private static String handleRequest(String request) {
        // logika bisnis — proses request dan kembalikan response
        return "{\"status\": \"ok\", \"result\": \"processed: " + request + "\"}";
    }
}

Retry dengan Exponential Backoff #

Ketika pesan gagal diproses karena error sementara (database sedang down, service lain tidak responsif), kamu butuh retry dengan delay — bukan langsung requeue yang akan menciptakan busy loop.

RabbitMQ tidak punya built-in delay, tapi kamu bisa mensimulasikannya dengan TTL queue:

public class RetryWithBackoff {

    // Setup: buat hierarchy queue untuk delay
    public static void setupRetryQueues(Channel channel) throws Exception {
        String mainExchange = "order-exchange";
        String mainQueue = "order-queue";
        String retryExchange = "retry-exchange";

        // Exchange utama
        channel.exchangeDeclare(mainExchange, "direct", true);

        // Queue retry dengan TTL — pesan akan "mati" kembali ke main queue setelah delay
        String[] retryQueues = {"retry-5s", "retry-30s", "retry-5m"};
        int[] delays = {5_000, 30_000, 300_000};

        for (int i = 0; i < retryQueues.length; i++) {
            java.util.Map<String, Object> args = new java.util.HashMap<>();
            args.put("x-message-ttl", delays[i]);
            args.put("x-dead-letter-exchange", mainExchange); // kembalikan ke main setelah TTL
            args.put("x-dead-letter-routing-key", "order");

            channel.queueDeclare(retryQueues[i], true, false, false, args);
            channel.queueBind(retryQueues[i], retryExchange, retryQueues[i]);
        }

        // DLQ untuk pesan yang sudah habis retry
        channel.queueDeclare("dlq.order-queue", true, false, false, null);

        // Main queue dengan DLX
        java.util.Map<String, Object> mainArgs = new java.util.HashMap<>();
        mainArgs.put("x-dead-letter-exchange", retryExchange);
        channel.queueDeclare(mainQueue, true, false, false, mainArgs);
        channel.queueBind(mainQueue, mainExchange, "order");
    }

    // Consumer yang menentukan ke retry queue mana pesan harus dikirim
    public static void consumeWithRetry(Channel channel) throws Exception {
        channel.basicQos(1);

        channel.basicConsume("order-queue", false,
            (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();

                // Hitung berapa kali pesan ini sudah di-retry
                var headers = delivery.getProperties().getHeaders();
                int retryCount = getRetryCount(headers);

                try {
                    processMessage(message);
                    channel.basicAck(deliveryTag, false);

                } catch (Exception e) {
                    if (retryCount >= 3) {
                        // Sudah retry 3x — kirim ke DLQ
                        forwardToDLQ(channel, delivery, message, e.getMessage());
                        channel.basicAck(deliveryTag, false); // ack agar tidak diproses lagi
                    } else {
                        // Nack tanpa requeue → masuk ke DLX → retry queue
                        channel.basicNack(deliveryTag, false, false);
                    }
                }
            },
            consumerTag -> {}
        );
    }

    private static int getRetryCount(java.util.Map<String, Object> headers) {
        if (headers == null) return 0;
        var xDeath = headers.get("x-death");
        if (xDeath == null) return 0;
        // x-death berisi list of death record
        @SuppressWarnings("unchecked")
        var deathList = (java.util.List<?>) xDeath;
        return deathList.size();
    }

    private static void processMessage(String message) throws Exception {
        System.out.println("Memproses: " + message);
    }

    private static void forwardToDLQ(Channel channel, com.rabbitmq.client.Delivery delivery,
                                      String message, String reason) throws Exception {
        System.err.println("Pesan masuk DLQ setelah 3 retry: " + reason);
        channel.basicPublish("", "dlq.order-queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    }
}

Kapan Menggunakan RabbitMQ dan Kapan Tidak #

GUNAKAN RABBITMQ JIKA:
  ✓ Butuh routing fleksibel berdasarkan content (topic exchange)
  ✓ Task queue — pekerjaan yang harus dieksekusi tepat sekali
  ✓ Request-reply / RPC pattern
  ✓ Volume moderat dengan kompleksitas routing tinggi
  ✓ Butuh priority queue (x-max-priority)
  ✓ Pesan harus dihapus setelah diproses (tidak butuh replay)
  ✓ Integrasi dengan protokol selain AMQP (STOMP, MQTT via plugin)

PERTIMBANGKAN ALTERNATIF JIKA:
  ✗ Volume sangat tinggi (jutaan pesan/detik) → Kafka lebih tepat
  ✗ Butuh replay pesan yang sudah lama → Kafka punya retention log
  ✗ Banyak consumer independen membaca data yang sama → Kafka consumer group
  ✗ Stream processing dan agregasi real-time → Kafka Streams
  ✗ Audit log yang tidak boleh dihapus → Kafka dengan retention panjang
flowchart TD
    A{Butuh routing\nfleksibel?} -- Ya --> B{Task dieksekusi\ntepat sekali?}
    A -- Tidak --> C{Volume sangat\ntinggi?}

    B -- Ya --> RABBIT[RabbitMQ]
    B -- Tidak --> D{Butuh\nrequest-reply?}

    D -- Ya --> RABBIT
    D -- Tidak --> C

    C -- Ya --> KAFKA[Kafka]
    C -- Tidak --> E{Butuh\nreplay?}

    E -- Ya --> KAFKA
    E -- Tidak --> RABBIT

Ringkasan #

  • Exchange adalah router — producer tidak mengirim langsung ke queue. Pilih tipe exchange yang tepat: direct untuk exact match, fanout untuk broadcast, topic untuk routing berbasis pattern.
  • Selalu gunakan durable=true untuk queue dan exchange di production, dan deliveryMode=2 (persistent) untuk pesan yang tidak boleh hilang saat broker restart.
  • Matikan auto ack dan lakukan basicAck secara manual setelah pemrosesan berhasil — ini mencegah pesan hilang jika consumer crash di tengah proses.
  • Atur prefetch count (basicQos) untuk distribusi beban yang adil antar consumer. Tanpa prefetch, consumer yang lambat akan mendapat tumpukan pesan yang belum bisa diproses.
  • Dead Letter Exchange (DLX) adalah cara resmi RabbitMQ untuk menangani pesan gagal — konfigurasi di level queue agar pesan yang di-nack otomatis masuk ke DLQ.
  • Pattern request-reply adalah keunggulan RabbitMQ dibanding Kafka — gunakan correlationId dan replyTo (amq.rabbitmq.reply-to) untuk komunikasi sinkron di atas messaging layer.
  • Retry dengan exponential backoff bisa diimplementasikan menggunakan TTL queue yang mengembalikan pesan ke main queue setelah delay tertentu.
  • RabbitMQ paling cocok untuk task queue, routing kompleks, dan request-reply — bukan untuk event streaming volume tinggi atau skenario yang butuh replay pesan.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact