Amazon SQS

Amazon SQS #

Ketika aplikasi kamu sudah berjalan di atas infrastruktur AWS, memilih message broker yang dikelola sepenuhnya oleh cloud provider adalah keputusan yang sangat masuk akal. Amazon Simple Queue Service (SQS) adalah layanan queue terkelola yang tidak membutuhkan provisioning server, tidak ada cluster yang perlu di-maintain, dan skalanya otomatis dari satu pesan hingga miliaran pesan per hari. Berbeda dari RabbitMQ yang membutuhkan kamu memahami exchange dan binding, atau Kafka yang membutuhkan pengelolaan partition dan consumer group, SQS dirancang untuk sesederhana mungkin: kirim pesan ke queue, ambil pesan dari queue, hapus pesan setelah selesai diproses. Kesederhanaan ini adalah kekuatan sekaligus keterbatasannya — dan memahami tradeoff tersebut adalah kunci untuk menggunakan SQS secara efektif.

Konsep Dasar SQS #

SQS menggunakan model yang berbeda dari broker tradisional. Ada beberapa konsep unik yang tidak kamu temukan di RabbitMQ atau Kafka.

Visibility Timeout #

Ini adalah konsep paling penting di SQS dan yang paling sering disalahpahami. Ketika consumer mengambil (receive) pesan dari SQS, pesan tersebut tidak langsung dihapus — ia menjadi tidak terlihat (invisible) untuk consumer lain selama periode visibility timeout. Consumer punya waktu sebesar visibility timeout untuk memproses dan menghapus pesan. Jika pesan tidak dihapus dalam waktu tersebut, SQS menganggap pemrosesan gagal dan membuat pesan terlihat kembali.

sequenceDiagram
    participant C as Consumer
    participant SQS

    C->>SQS: ReceiveMessage
    SQS-->>C: pesan (visibility timeout dimulai: 30s)
    Note over SQS: pesan tidak terlihat\noleh consumer lain

    alt Proses berhasil
        C->>SQS: DeleteMessage
        Note over SQS: pesan dihapus permanen
    else Timeout habis (consumer crash)
        Note over SQS: pesan terlihat kembali
        SQS-->>C: pesan dikirim ulang ke consumer lain
    end

Standard Queue vs FIFO Queue #

SQS punya dua tipe queue dengan karakteristik yang sangat berbeda:

Standard QueueFIFO Queue
ThroughputHampir tak terbatas300 msg/s (3.000 dengan batching)
OrderingBest-effort (tidak dijamin)Ketat — first-in, first-out
DuplikasiMungkin terjadi (at-least-once)Tepat sekali (exactly-once)
Nama queuebebasharus diakhiri .fifo
HargaLebih murahLebih mahal
Use caseThroughput tinggi, toleran duplikatTransaksi keuangan, ordering kritis
flowchart TD
    A{Ordering pesan\nharus dijamin?} -- Ya --> B{Throughput\n> 300 msg/s?}
    A -- Tidak --> C{Pesan duplikat\nbisa ditoleransi?}

    B -- Ya --> D[Pertimbangkan Kafka\natau arsitektur ulang]
    B -- Tidak --> E[FIFO Queue]

    C -- Ya --> F[Standard Queue\nlebih murah dan cepat]
    C -- Tidak --> G{Perlu deduplication\nwindow 5 menit?}

    G -- Ya --> E
    G -- Tidak --> F

Message Group (FIFO Queue) #

FIFO queue mendukung konsep message group — setiap grup diproses secara berurutan, tapi grup yang berbeda bisa diproses secara paralel. Ini memberikan skalabilitas sambil tetap menjamin ordering per entitas.

FIFO Queue — order-processing.fifo

MessageGroupId = "order-123"  → [event-1] [event-2] [event-3]  ← urut
MessageGroupId = "order-456"  → [event-1] [event-2]             ← urut, paralel dengan order-123
MessageGroupId = "order-789"  → [event-1]                        ← urut, paralel dengan kedua grup

Setup Dependencies #

Tambahkan AWS SDK v2 untuk SQS ke project kamu. AWS SDK v2 adalah versi modern yang menggunakan builder pattern dan mendukung async secara native.

Untuk Maven:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.25.60</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!-- SQS client -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>sqs</artifactId>
    </dependency>
    <!-- URL Connection HTTP client (ringan, tidak butuh Netty) -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>url-connection-client</artifactId>
    </dependency>
</dependencies>

Untuk Gradle:

dependencies {
    implementation platform('software.amazon.awssdk:bom:2.25.60')
    implementation 'software.amazon.awssdk:sqs'
    implementation 'software.amazon.awssdk:url-connection-client'
}

Untuk development lokal tanpa akun AWS, gunakan LocalStack:

# Jalankan LocalStack — emulator layanan AWS lokal
docker run -d \
  --name localstack \
  -p 4566:4566 \
  -e SERVICES=sqs \
  localstack/localstack

# Buat queue via AWS CLI (pointing ke LocalStack)
aws --endpoint-url=http://localhost:4566 sqs create-queue \
  --queue-name order-queue \
  --region ap-southeast-1

Membuat SQS Client #

Konfigurasi client berbeda antara environment production (AWS) dan development (LocalStack):

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.net.URI;

public class SqsClientFactory {

    // ✓ BENAR: production — gunakan DefaultCredentialsProvider
    // Otomatis mencari credentials dari: env vars → ~/.aws/credentials → IAM role
    public static SqsClient createProductionClient() {
        return SqsClient.builder()
            .region(Region.AP_SOUTHEAST_1)
            .credentialsProvider(DefaultCredentialsProvider.create())
            .httpClient(UrlConnectionHttpClient.builder().build())
            .build();
    }

    // ✓ BENAR: development dengan LocalStack
    public static SqsClient createLocalStackClient() {
        return SqsClient.builder()
            .region(Region.AP_SOUTHEAST_1)
            .endpointOverride(URI.create("http://localhost:4566"))
            .credentialsProvider(StaticCredentialsProvider.create(
                AwsBasicCredentials.create("test", "test") // LocalStack tidak validasi credentials
            ))
            .httpClient(UrlConnectionHttpClient.builder().build())
            .build();
    }

    // ✗ ANTI-PATTERN: hardcode credentials di kode
    // Credentials bisa bocor ke version control
    public static SqsClient createInsecureClient() {
        return SqsClient.builder()
            .credentialsProvider(StaticCredentialsProvider.create(
                AwsBasicCredentials.create("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
            ))
            .build();
    }
}
Jangan pernah hardcode AWS credentials (Access Key ID dan Secret Access Key) di dalam kode sumber. Gunakan DefaultCredentialsProvider di production yang otomatis membaca dari environment variable AWS_ACCESS_KEY_ID dan AWS_SECRET_ACCESS_KEY, file ~/.aws/credentials, atau IAM role jika berjalan di EC2/ECS/Lambda.

Manajemen Queue #

Sebelum mengirim atau menerima pesan, queue harus ada. Kamu bisa membuat dan mengelola queue secara programatis.

import software.amazon.awssdk.services.sqs.model.*;

import java.util.Map;

public class SqsQueueManager {

    private final SqsClient sqsClient;

    public SqsQueueManager(SqsClient sqsClient) {
        this.sqsClient = sqsClient;
    }

    // Membuat Standard Queue
    public String createStandardQueue(String queueName) {
        CreateQueueRequest request = CreateQueueRequest.builder()
            .queueName(queueName)
            .attributes(Map.of(
                QueueAttributeName.VISIBILITY_TIMEOUT, "30",          // detik
                QueueAttributeName.MESSAGE_RETENTION_PERIOD, "86400", // 1 hari (detik)
                QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20", // long polling
                QueueAttributeName.MAX_MESSAGE_SIZE, "262144"         // 256 KB (maks SQS)
            ))
            .build();

        CreateQueueResponse response = sqsClient.createQueue(request);
        System.out.println("Queue dibuat: " + response.queueUrl());
        return response.queueUrl();
    }

    // Membuat FIFO Queue
    public String createFifoQueue(String queueName) {
        // Nama FIFO queue harus diakhiri dengan .fifo
        if (!queueName.endsWith(".fifo")) {
            queueName = queueName + ".fifo";
        }

        CreateQueueRequest request = CreateQueueRequest.builder()
            .queueName(queueName)
            .attributes(Map.of(
                QueueAttributeName.FIFO_QUEUE, "true",
                QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false", // gunakan deduplication ID eksplisit
                QueueAttributeName.VISIBILITY_TIMEOUT, "30",
                QueueAttributeName.MESSAGE_RETENTION_PERIOD, "86400"
            ))
            .build();

        CreateQueueResponse response = sqsClient.createQueue(request);
        return response.queueUrl();
    }

    // Membuat Standard Queue dengan Dead Letter Queue
    public String createQueueWithDLQ(String mainQueueName, String dlqName) {
        // 1. Buat DLQ terlebih dahulu
        String dlqUrl = createStandardQueue(dlqName);

        // 2. Ambil ARN dari DLQ
        GetQueueAttributesResponse dlqAttrs = sqsClient.getQueueAttributes(
            GetQueueAttributesRequest.builder()
                .queueUrl(dlqUrl)
                .attributeNames(QueueAttributeName.QUEUE_ARN)
                .build()
        );
        String dlqArn = dlqAttrs.attributes().get(QueueAttributeName.QUEUE_ARN);

        // 3. Buat main queue dengan redrive policy yang mengarah ke DLQ
        String redrivePolicy = String.format(
            "{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"3\"}",
            dlqArn
        );
        // maxReceiveCount=3 → pesan dikirim ke DLQ setelah gagal diproses 3 kali

        CreateQueueRequest request = CreateQueueRequest.builder()
            .queueName(mainQueueName)
            .attributes(Map.of(
                QueueAttributeName.VISIBILITY_TIMEOUT, "30",
                QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20",
                QueueAttributeName.REDRIVE_POLICY, redrivePolicy
            ))
            .build();

        CreateQueueResponse response = sqsClient.createQueue(request);
        System.out.println("Main queue: " + response.queueUrl());
        System.out.println("DLQ: " + dlqUrl);
        return response.queueUrl();
    }

    // Mendapatkan URL queue berdasarkan nama
    public String getQueueUrl(String queueName) {
        return sqsClient.getQueueUrl(
            GetQueueUrlRequest.builder().queueName(queueName).build()
        ).queueUrl();
    }
}

Mengirim Pesan (Producer) #

Mengirim Pesan Tunggal #

import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

public class SqsProducer {

    private final SqsClient sqsClient;
    private final String queueUrl;

    public SqsProducer(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    // Mengirim pesan sederhana ke Standard Queue
    public String send(String messageBody) {
        SendMessageRequest request = SendMessageRequest.builder()
            .queueUrl(queueUrl)
            .messageBody(messageBody)
            .delaySeconds(0) // delay pengiriman (0-900 detik), default 0
            .messageAttributes(Map.of(
                "source-service", MessageAttributeValue.builder()
                    .dataType("String")
                    .stringValue("order-service")
                    .build(),
                "event-type", MessageAttributeValue.builder()
                    .dataType("String")
                    .stringValue("order.created")
                    .build(),
                "retry-count", MessageAttributeValue.builder()
                    .dataType("Number")
                    .stringValue("0")
                    .build()
            ))
            .build();

        SendMessageResponse response = sqsClient.sendMessage(request);
        System.out.println("Pesan terkirim — MessageId: " + response.messageId());
        return response.messageId();
    }

    // Mengirim ke FIFO Queue — butuh MessageGroupId dan MessageDeduplicationId
    public String sendToFifo(String messageBody, String messageGroupId, String deduplicationId) {
        SendMessageRequest request = SendMessageRequest.builder()
            .queueUrl(queueUrl)
            .messageBody(messageBody)
            // MessageGroupId — semua pesan dalam grup yang sama diproses urut
            .messageGroupId(messageGroupId)
            // MessageDeduplicationId — SQS tolak pesan duplikat dalam window 5 menit
            // Gunakan nilai yang unik per pesan (misal: UUID atau hash dari konten)
            .messageDeduplicationId(deduplicationId)
            .build();

        SendMessageResponse response = sqsClient.sendMessage(request);
        return response.messageId();
    }
}

Batch Send — Menghemat Biaya #

SQS menagih per request API, bukan per pesan. Mengirim 10 pesan dalam satu batch request sama biayanya dengan mengirim 1 pesan. Selalu gunakan batch jika memungkinkan.

import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class SqsBatchProducer {

    private final SqsClient sqsClient;
    private final String queueUrl;

    public SqsBatchProducer(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    // ✗ ANTI-PATTERN: kirim satu per satu dalam loop
    // 100 pesan = 100 API call = 100x biaya
    public void sendOneByOne(List<String> messages) {
        for (String msg : messages) {
            sqsClient.sendMessage(SendMessageRequest.builder()
                .queueUrl(queueUrl)
                .messageBody(msg)
                .build());
        }
    }

    // ✓ BENAR: batch send — maks 10 pesan per batch request
    // 100 pesan = 10 API call = 10x lebih hemat
    public void sendBatch(List<String> messages) {
        // SQS membatasi 10 pesan per batch
        int batchSize = 10;

        for (int i = 0; i < messages.size(); i += batchSize) {
            List<String> batch = messages.subList(i, Math.min(i + batchSize, messages.size()));
            sendSingleBatch(batch);
        }
    }

    private void sendSingleBatch(List<String> batch) {
        List<SendMessageBatchRequestEntry> entries = new ArrayList<>();

        for (int i = 0; i < batch.size(); i++) {
            entries.add(SendMessageBatchRequestEntry.builder()
                .id(String.valueOf(i))   // ID unik dalam batch (bukan message ID global)
                .messageBody(batch.get(i))
                .build());
        }

        SendMessageBatchResponse response = sqsClient.sendMessageBatch(
            SendMessageBatchRequest.builder()
                .queueUrl(queueUrl)
                .entries(entries)
                .build()
        );

        // Periksa pesan yang gagal terkirim dalam batch
        if (!response.failed().isEmpty()) {
            response.failed().forEach(failure -> {
                System.err.printf(
                    "Gagal kirim pesan ID=%s: %s — %s%n",
                    failure.id(), failure.code(), failure.message()
                );
                // Di sini: retry pesan yang gagal atau kirim ke fallback
            });
        }

        System.out.printf("Batch terkirim: %d berhasil, %d gagal%n",
            response.successful().size(), response.failed().size());
    }
}

Menerima dan Memproses Pesan (Consumer) #

Long Polling #

SQS mendukung dua mode polling: short polling dan long polling. Selalu gunakan long polling.

SHORT POLLING (WaitTimeSeconds=0):
  Consumer → SQS: "Ada pesan?"
  SQS → Consumer: "Tidak ada" (meski pesan mungkin sudah ada)
  [ulangi setiap detik]
  Akibat: banyak empty response, biaya tinggi, latensi tidak tentu

LONG POLLING (WaitTimeSeconds=1-20):
  Consumer → SQS: "Ada pesan? Tunggu maks 20 detik"
  SQS: [menunggu sampai ada pesan atau timeout]
  SQS → Consumer: pesan (segera setelah tersedia)
  Akibat: lebih sedikit empty response, biaya turun, latensi lebih rendah
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public class SqsConsumer {

    private final SqsClient sqsClient;
    private final String queueUrl;
    private volatile boolean running = true;

    public SqsConsumer(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    public void start() {
        System.out.println("Consumer mulai berjalan...");

        while (running) {
            try {
                // ✓ Long polling — tunggu maks 20 detik jika tidak ada pesan
                ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .maxNumberOfMessages(10)      // maks 10 pesan per receive (maks SQS)
                    .waitTimeSeconds(20)           // long polling
                    .visibilityTimeout(30)         // consumer punya 30 detik untuk proses
                    .messageAttributeNames("All")  // ambil semua message attributes
                    .attributeNames(              // ambil queue attributes (ApproximateReceiveCount, dll)
                        software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT,
                        software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP
                    )
                    .build();

                ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
                List<Message> messages = response.messages();

                if (messages.isEmpty()) {
                    continue; // tidak ada pesan, poll lagi
                }

                for (Message message : messages) {
                    processAndDelete(message);
                }

            } catch (Exception e) {
                System.err.println("Error saat polling: " + e.getMessage());
                // jangan crash — tunggu sebentar lalu coba lagi
                sleep(5000);
            }
        }
    }

    private void processAndDelete(Message message) {
        try {
            System.out.printf("Memproses MessageId=%s: %s%n",
                message.messageId(), message.body());

            // Periksa berapa kali pesan ini sudah diterima
            String receiveCount = message.attributes().get(
                software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT
            );
            System.out.println("Sudah diterima " + receiveCount + " kali");

            doProcess(message.body());

            // ✓ Hapus pesan setelah berhasil diproses
            // Gunakan ReceiptHandle (bukan MessageId) untuk delete
            sqsClient.deleteMessage(DeleteMessageRequest.builder()
                .queueUrl(queueUrl)
                .receiptHandle(message.receiptHandle())
                .build());

            System.out.println("Pesan berhasil diproses dan dihapus.");

        } catch (Exception e) {
            System.err.printf("Gagal memproses MessageId=%s: %s%n",
                message.messageId(), e.getMessage());
            // Jangan hapus pesan — biarkan visibility timeout habis
            // SQS akan mengirimkan pesan ini kembali ke consumer lain
            // Setelah maxReceiveCount tercapai, otomatis masuk DLQ
        }
    }

    private void doProcess(String body) throws Exception {
        // logika bisnis
    }

    public void stop() {
        running = false;
    }

    private void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

Memperpanjang Visibility Timeout #

Jika kamu tahu pemrosesan akan memakan waktu lebih lama dari visibility timeout, perpanjang sebelum habis:

import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;

public class LongRunningProcessor {

    private final SqsClient sqsClient;
    private final String queueUrl;

    public LongRunningProcessor(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    public void processHeavyTask(Message message) throws Exception {
        // Jalankan timer untuk perpanjang visibility timeout setiap 25 detik
        // (visibility timeout awal = 30 detik, perpanjang 5 detik sebelum habis)
        java.util.concurrent.ScheduledExecutorService scheduler =
            java.util.concurrent.Executors.newSingleThreadScheduledExecutor();

        scheduler.scheduleAtFixedRate(() -> {
            try {
                sqsClient.changeMessageVisibility(
                    ChangeMessageVisibilityRequest.builder()
                        .queueUrl(queueUrl)
                        .receiptHandle(message.receiptHandle())
                        .visibilityTimeout(30) // perpanjang 30 detik lagi
                        .build()
                );
                System.out.println("Visibility timeout diperpanjang.");
            } catch (Exception e) {
                System.err.println("Gagal perpanjang visibility timeout: " + e.getMessage());
            }
        }, 25, 25, java.util.concurrent.TimeUnit.SECONDS);

        try {
            // Lakukan proses berat yang memakan waktu lama
            doHeavyWork(message.body());

            // Hapus pesan setelah selesai
            sqsClient.deleteMessage(DeleteMessageRequest.builder()
                .queueUrl(queueUrl)
                .receiptHandle(message.receiptHandle())
                .build());

        } finally {
            scheduler.shutdown();
        }
    }

    private void doHeavyWork(String body) throws Exception {
        // simulasi proses berat — misal: encode video, generate laporan besar
        Thread.sleep(60_000); // 1 menit
    }
}

Batch Delete — Menghemat Biaya saat Delete #

Sama seperti batch send, delete juga bisa dilakukan dalam batch:

import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;

public class SqsBatchConsumer {

    private final SqsClient sqsClient;
    private final String queueUrl;

    public SqsBatchConsumer(SqsClient sqsClient, String queueUrl) {
        this.sqsClient = sqsClient;
        this.queueUrl = queueUrl;
    }

    public void processAndDeleteBatch() {
        ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(10)
            .waitTimeSeconds(20)
            .build();

        List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages();
        if (messages.isEmpty()) return;

        List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
        List<String> failed = new ArrayList<>();

        for (Message message : messages) {
            try {
                doProcess(message.body());
                // Kumpulkan receipt handle untuk batch delete
                toDelete.add(DeleteMessageBatchRequestEntry.builder()
                    .id(message.messageId())
                    .receiptHandle(message.receiptHandle())
                    .build());
            } catch (Exception e) {
                failed.add(message.messageId());
                System.err.println("Gagal proses: " + message.messageId());
            }
        }

        // ✓ Batch delete semua pesan yang berhasil dalam satu API call
        if (!toDelete.isEmpty()) {
            var deleteResponse = sqsClient.deleteMessageBatch(
                DeleteMessageBatchRequest.builder()
                    .queueUrl(queueUrl)
                    .entries(toDelete)
                    .build()
            );

            if (!deleteResponse.failed().isEmpty()) {
                deleteResponse.failed().forEach(f ->
                    System.err.println("Gagal delete MessageId=" + f.id() + ": " + f.message())
                );
            }
        }

        System.out.printf("Batch selesai: %d diproses, %d gagal%n",
            toDelete.size(), failed.size());
    }

    private void doProcess(String body) throws Exception {
        // logika bisnis
    }
}

Dead Letter Queue #

SQS punya mekanisme DLQ bawaan melalui Redrive Policy. Ketika pesan sudah diterima lebih dari maxReceiveCount kali tanpa berhasil dihapus, SQS otomatis memindahkannya ke DLQ.

flowchart TD
    P[Producer] --> Q[order-queue\nmaxReceiveCount=3]
    Q --> C{Consumer\nberhasil proses?}
    C -- Ya\nDeleteMessage --> DONE[Pesan dihapus]
    C -- Tidak\nTimeout / Error --> Q
    Q -->|Sudah 3x diterima| DLQ[dlq-order-queue]
    DLQ --> MON[DLQ Monitor\nAlert & Investigasi]
    MON -->|Setelah diperbaiki| REDRIVE[Redrive ke\nmain queue]

Memonitor dan memproses ulang pesan dari DLQ:

public class DlqMonitor {

    private final SqsClient sqsClient;
    private final String dlqUrl;
    private final String mainQueueUrl;

    public DlqMonitor(SqsClient sqsClient, String dlqUrl, String mainQueueUrl) {
        this.sqsClient = sqsClient;
        this.dlqUrl = dlqUrl;
        this.mainQueueUrl = mainQueueUrl;
    }

    // Periksa jumlah pesan di DLQ
    public int getDlqMessageCount() {
        GetQueueAttributesResponse response = sqsClient.getQueueAttributes(
            GetQueueAttributesRequest.builder()
                .queueUrl(dlqUrl)
                .attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
                .build()
        );
        return Integer.parseInt(
            response.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
        );
    }

    // Baca pesan dari DLQ untuk investigasi
    public void inspectDlq() {
        ReceiveMessageRequest request = ReceiveMessageRequest.builder()
            .queueUrl(dlqUrl)
            .maxNumberOfMessages(10)
            .waitTimeSeconds(1)  // short poll untuk inspeksi manual
            .attributeNames(
                software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT,
                software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP
            )
            .build();

        List<Message> messages = sqsClient.receiveMessage(request).messages();
        System.out.println("Pesan di DLQ: " + messages.size());

        for (Message msg : messages) {
            System.out.printf(
                "MessageId=%s | Diterima %s kali | Body: %s%n",
                msg.messageId(),
                msg.attributes().get(
                    software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT
                ),
                msg.body()
            );
        }
    }

    // Redrive — pindahkan pesan dari DLQ kembali ke main queue setelah bug diperbaiki
    // AWS Console menyediakan fitur "Start DLQ redrive" secara built-in,
    // tapi ini implementasi manual jika butuh kontrol lebih:
    public void redriveMessages() {
        List<Message> messages;
        int redrived = 0;

        do {
            messages = sqsClient.receiveMessage(
                ReceiveMessageRequest.builder()
                    .queueUrl(dlqUrl)
                    .maxNumberOfMessages(10)
                    .waitTimeSeconds(1)
                    .build()
            ).messages();

            for (Message msg : messages) {
                // Kirim ulang ke main queue
                sqsClient.sendMessage(SendMessageRequest.builder()
                    .queueUrl(mainQueueUrl)
                    .messageBody(msg.body())
                    .build());

                // Hapus dari DLQ
                sqsClient.deleteMessage(DeleteMessageRequest.builder()
                    .queueUrl(dlqUrl)
                    .receiptHandle(msg.receiptHandle())
                    .build());

                redrived++;
            }
        } while (!messages.isEmpty());

        System.out.println("Total pesan di-redrive: " + redrived);
    }
}

Integrasi SNS + SQS — Fan-out Pattern #

SQS sendiri tidak mendukung fan-out (satu pesan ke banyak consumer). Untuk itu, kombinasikan dengan Amazon SNS (Simple Notification Service). SNS bertindak sebagai publisher yang mendistribusikan pesan ke semua SQS queue yang berlangganan.

flowchart TD
    P[Producer] --> SNS[SNS Topic\norder-events]
    SNS --> Q1[SQS Queue\nanalytics-queue]
    SNS --> Q2[SQS Queue\nnotification-queue]
    SNS --> Q3[SQS Queue\naudit-queue]
    Q1 --> C1[Analytics Service]
    Q2 --> C2[Notification Service]
    Q3 --> C3[Audit Service]
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

public class SnsSqsFanout {

    private final SnsClient snsClient;
    private final SqsClient sqsClient;

    public SnsSqsFanout(SnsClient snsClient, SqsClient sqsClient) {
        this.snsClient = snsClient;
        this.sqsClient = sqsClient;
    }

    public void setupFanout(String topicName, List<String> queueArns) {
        // 1. Buat SNS Topic
        CreateTopicResponse topicResponse = snsClient.createTopic(
            CreateTopicRequest.builder().name(topicName).build()
        );
        String topicArn = topicResponse.topicArn();

        // 2. Subscribe setiap SQS queue ke SNS topic
        for (String queueArn : queueArns) {
            snsClient.subscribe(SubscribeRequest.builder()
                .topicArn(topicArn)
                .protocol("sqs")
                .endpoint(queueArn)
                .build());
        }

        System.out.println("Fan-out setup selesai. Topic ARN: " + topicArn);
    }

    // Publish ke SNS — otomatis dikirim ke semua SQS queue yang subscribe
    public void publish(String topicArn, String message) {
        snsClient.publish(PublishRequest.builder()
            .topicArn(topicArn)
            .message(message)
            .subject("order-event")
            .build());
    }
}
Ketika SQS menerima pesan dari SNS, body pesan dibungkus dalam JSON envelope SNS. Consumer SQS perlu mem-parse envelope tersebut untuk mendapatkan pesan aslinya. Gunakan atribut Message di dalam JSON SNS notification, bukan body langsung dari SQS message.

Menangani Pesan Duplikat #

Standard Queue bisa mengirimkan pesan lebih dari sekali (at-least-once delivery). Consumer harus idempoten — memproses pesan yang sama dua kali tidak boleh menghasilkan efek yang berbeda.

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class IdempotentConsumer {

    // Di production: gunakan Redis atau DynamoDB untuk menyimpan processed IDs
    // dengan TTL yang sesuai retention period queue
    private final Set<String> processedIds = ConcurrentHashMap.newKeySet();

    public void processIdempotent(Message message) throws Exception {
        String messageId = message.messageId();

        // ✗ ANTI-PATTERN: langsung proses tanpa cek duplikat
        // doProcess(message.body()); // bisa dieksekusi dua kali!

        // ✓ BENAR: cek apakah sudah pernah diproses
        if (processedIds.contains(messageId)) {
            System.out.println("Duplikat diabaikan: " + messageId);
            return; // langsung return, tapi tetap delete nanti
        }

        // Proses pesan
        doProcess(message.body());

        // Tandai sebagai sudah diproses
        processedIds.add(messageId);
    }

    private void doProcess(String body) throws Exception {
        System.out.println("Memproses: " + body);
        // logika bisnis yang menghasilkan efek (write ke DB, kirim email, dll)
    }
}

Untuk production, gunakan DynamoDB sebagai idempotency store:

import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;

import java.util.Map;

public class DynamoDbIdempotencyStore {

    private final DynamoDbClient dynamoDb;
    private static final String TABLE = "sqs-processed-messages";

    public DynamoDbIdempotencyStore(DynamoDbClient dynamoDb) {
        this.dynamoDb = dynamoDb;
    }

    // Coba tandai pesan sebagai "sedang diproses"
    // Menggunakan conditional write — hanya berhasil jika messageId belum ada
    public boolean tryMarkAsProcessing(String messageId) {
        try {
            dynamoDb.putItem(PutItemRequest.builder()
                .tableName(TABLE)
                .item(Map.of(
                    "messageId", AttributeValue.fromS(messageId),
                    "status", AttributeValue.fromS("PROCESSING"),
                    "ttl", AttributeValue.fromN(            // hapus otomatis setelah 24 jam
                        String.valueOf(System.currentTimeMillis() / 1000 + 86400)
                    )
                ))
                .conditionExpression("attribute_not_exists(messageId)")
                .build());
            return true; // berhasil — pesan ini belum pernah diproses

        } catch (software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException e) {
            return false; // sudah pernah diproses — ini duplikat
        }
    }

    public void markAsCompleted(String messageId) {
        dynamoDb.updateItem(UpdateItemRequest.builder()
            .tableName(TABLE)
            .key(Map.of("messageId", AttributeValue.fromS(messageId)))
            .updateExpression("SET #s = :completed")
            .expressionAttributeNames(Map.of("#s", "status"))
            .expressionAttributeValues(Map.of(":completed", AttributeValue.fromS("COMPLETED")))
            .build());
    }
}

Kapan Menggunakan SQS dan Kapan Tidak #

GUNAKAN SQS JIKA:
  ✓ Sudah di ekosistem AWS dan ingin layanan terkelola penuh
  ✓ Tidak mau repot maintain broker — SQS zero ops
  ✓ Task queue sederhana tanpa routing kompleks
  ✓ Volume yang sangat variatif — SQS auto-scale tanpa konfigurasi
  ✓ Butuh integrasi mudah dengan Lambda, SNS, S3, dan layanan AWS lain
  ✓ Biaya perlu dikontrol — bayar per request, tidak ada biaya idle
  ✓ Butuh FIFO dengan exactly-once delivery yang mudah dikonfigurasi

PERTIMBANGKAN ALTERNATIF JIKA:
  ✗ Butuh routing kompleks (exchange pattern) → RabbitMQ lebih fleksibel
  ✗ Volume sangat tinggi (jutaan msg/s) dengan latensi ultra-rendah → Kafka
  ✗ Butuh replay pesan yang sudah diproses → Kafka dengan retention log
  ✗ Tidak di AWS dan tidak mau vendor lock-in → RabbitMQ atau Kafka
  ✗ Butuh streaming dan agregasi real-time → Kafka Streams atau Kinesis
  ✗ Pesan lebih besar dari 256 KB → SQS tidak support, perlu S3 + SQS pointer

Ringkasan #

  • Visibility timeout adalah konsep inti SQS — pesan tidak dihapus saat diterima, hanya menjadi tidak terlihat. Hapus pesan secara eksplisit dengan DeleteMessage setelah berhasil diproses.
  • Standard Queue untuk throughput tinggi dengan toleransi duplikat; FIFO Queue untuk ordering ketat dan exactly-once delivery — harga lebih mahal dan throughput lebih terbatas.
  • Selalu gunakan long polling (WaitTimeSeconds=20) untuk mengurangi empty response, menghemat biaya API, dan menurunkan latensi dibanding short polling.
  • Batch send dan batch delete menghemat biaya hingga 10x karena SQS menagih per request API, bukan per pesan. Maksimal 10 pesan per batch.
  • Dead Letter Queue dikonfigurasi melalui Redrive Policy di level queue — pesan otomatis masuk DLQ setelah gagal diproses sebanyak maxReceiveCount kali.
  • Perpanjang visibility timeout (ChangeMessageVisibility) jika proses memakan waktu lebih lama dari timeout awal — ini mencegah pesan dikirim ulang ke consumer lain saat masih diproses.
  • Standard Queue bersifat at-least-once — consumer harus idempoten. Gunakan DynamoDB dengan conditional write sebagai idempotency store di production.
  • SNS + SQS untuk fan-out pattern — SNS mendistribusikan satu pesan ke banyak SQS queue secara paralel, memberikan kemampuan yang tidak dimiliki SQS sendirian.
  • SQS paling cocok untuk ekosistem AWS, zero-ops task queue, dan volume yang sangat variatif — bukan untuk routing kompleks, replay, atau streaming real-time.

← Sebelumnya: RabbitMQ   Berikutnya: Google Pub/Sub →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact