Amazon SQS #
Ketika aplikasi kamu sudah berjalan di atas infrastruktur AWS, memilih message broker yang dikelola sepenuhnya oleh cloud provider adalah keputusan yang sangat masuk akal. Amazon Simple Queue Service (SQS) adalah layanan queue terkelola yang tidak membutuhkan provisioning server, tidak ada cluster yang perlu di-maintain, dan skalanya otomatis dari satu pesan hingga miliaran pesan per hari. Berbeda dari RabbitMQ yang membutuhkan kamu memahami exchange dan binding, atau Kafka yang membutuhkan pengelolaan partition dan consumer group, SQS dirancang untuk sesederhana mungkin: kirim pesan ke queue, ambil pesan dari queue, hapus pesan setelah selesai diproses. Kesederhanaan ini adalah kekuatan sekaligus keterbatasannya — dan memahami tradeoff tersebut adalah kunci untuk menggunakan SQS secara efektif.
Konsep Dasar SQS #
SQS menggunakan model yang berbeda dari broker tradisional. Ada beberapa konsep unik yang tidak kamu temukan di RabbitMQ atau Kafka.
Visibility Timeout #
Ini adalah konsep paling penting di SQS dan yang paling sering disalahpahami. Ketika consumer mengambil (receive) pesan dari SQS, pesan tersebut tidak langsung dihapus — ia menjadi tidak terlihat (invisible) untuk consumer lain selama periode visibility timeout. Consumer punya waktu sebesar visibility timeout untuk memproses dan menghapus pesan. Jika pesan tidak dihapus dalam waktu tersebut, SQS menganggap pemrosesan gagal dan membuat pesan terlihat kembali.
sequenceDiagram
participant C as Consumer
participant SQS
C->>SQS: ReceiveMessage
SQS-->>C: pesan (visibility timeout dimulai: 30s)
Note over SQS: pesan tidak terlihat\noleh consumer lain
alt Proses berhasil
C->>SQS: DeleteMessage
Note over SQS: pesan dihapus permanen
else Timeout habis (consumer crash)
Note over SQS: pesan terlihat kembali
SQS-->>C: pesan dikirim ulang ke consumer lain
endStandard Queue vs FIFO Queue #
SQS punya dua tipe queue dengan karakteristik yang sangat berbeda:
| Standard Queue | FIFO Queue | |
|---|---|---|
| Throughput | Hampir tak terbatas | 300 msg/s (3.000 dengan batching) |
| Ordering | Best-effort (tidak dijamin) | Ketat — first-in, first-out |
| Duplikasi | Mungkin terjadi (at-least-once) | Tepat sekali (exactly-once) |
| Nama queue | bebas | harus diakhiri .fifo |
| Harga | Lebih murah | Lebih mahal |
| Use case | Throughput tinggi, toleran duplikat | Transaksi keuangan, ordering kritis |
flowchart TD
A{Ordering pesan\nharus dijamin?} -- Ya --> B{Throughput\n> 300 msg/s?}
A -- Tidak --> C{Pesan duplikat\nbisa ditoleransi?}
B -- Ya --> D[Pertimbangkan Kafka\natau arsitektur ulang]
B -- Tidak --> E[FIFO Queue]
C -- Ya --> F[Standard Queue\nlebih murah dan cepat]
C -- Tidak --> G{Perlu deduplication\nwindow 5 menit?}
G -- Ya --> E
G -- Tidak --> FMessage Group (FIFO Queue) #
FIFO queue mendukung konsep message group — setiap grup diproses secara berurutan, tapi grup yang berbeda bisa diproses secara paralel. Ini memberikan skalabilitas sambil tetap menjamin ordering per entitas.
FIFO Queue — order-processing.fifo
MessageGroupId = "order-123" → [event-1] [event-2] [event-3] ← urut
MessageGroupId = "order-456" → [event-1] [event-2] ← urut, paralel dengan order-123
MessageGroupId = "order-789" → [event-1] ← urut, paralel dengan kedua grup
Setup Dependencies #
Tambahkan AWS SDK v2 untuk SQS ke project kamu. AWS SDK v2 adalah versi modern yang menggunakan builder pattern dan mendukung async secara native.
Untuk Maven:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.25.60</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SQS client -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<!-- URL Connection HTTP client (ringan, tidak butuh Netty) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
</dependency>
</dependencies>
Untuk Gradle:
dependencies {
implementation platform('software.amazon.awssdk:bom:2.25.60')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:url-connection-client'
}
Untuk development lokal tanpa akun AWS, gunakan LocalStack:
# Jalankan LocalStack — emulator layanan AWS lokal
docker run -d \
--name localstack \
-p 4566:4566 \
-e SERVICES=sqs \
localstack/localstack
# Buat queue via AWS CLI (pointing ke LocalStack)
aws --endpoint-url=http://localhost:4566 sqs create-queue \
--queue-name order-queue \
--region ap-southeast-1
Membuat SQS Client #
Konfigurasi client berbeda antara environment production (AWS) dan development (LocalStack):
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import java.net.URI;
public class SqsClientFactory {
// ✓ BENAR: production — gunakan DefaultCredentialsProvider
// Otomatis mencari credentials dari: env vars → ~/.aws/credentials → IAM role
public static SqsClient createProductionClient() {
return SqsClient.builder()
.region(Region.AP_SOUTHEAST_1)
.credentialsProvider(DefaultCredentialsProvider.create())
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
}
// ✓ BENAR: development dengan LocalStack
public static SqsClient createLocalStackClient() {
return SqsClient.builder()
.region(Region.AP_SOUTHEAST_1)
.endpointOverride(URI.create("http://localhost:4566"))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("test", "test") // LocalStack tidak validasi credentials
))
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
}
// ✗ ANTI-PATTERN: hardcode credentials di kode
// Credentials bisa bocor ke version control
public static SqsClient createInsecureClient() {
return SqsClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
))
.build();
}
}
Jangan pernah hardcode AWS credentials (Access Key ID dan Secret Access Key) di dalam kode sumber. GunakanDefaultCredentialsProviderdi production yang otomatis membaca dari environment variableAWS_ACCESS_KEY_IDdanAWS_SECRET_ACCESS_KEY, file~/.aws/credentials, atau IAM role jika berjalan di EC2/ECS/Lambda.
Manajemen Queue #
Sebelum mengirim atau menerima pesan, queue harus ada. Kamu bisa membuat dan mengelola queue secara programatis.
import software.amazon.awssdk.services.sqs.model.*;
import java.util.Map;
public class SqsQueueManager {
private final SqsClient sqsClient;
public SqsQueueManager(SqsClient sqsClient) {
this.sqsClient = sqsClient;
}
// Membuat Standard Queue
public String createStandardQueue(String queueName) {
CreateQueueRequest request = CreateQueueRequest.builder()
.queueName(queueName)
.attributes(Map.of(
QueueAttributeName.VISIBILITY_TIMEOUT, "30", // detik
QueueAttributeName.MESSAGE_RETENTION_PERIOD, "86400", // 1 hari (detik)
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20", // long polling
QueueAttributeName.MAX_MESSAGE_SIZE, "262144" // 256 KB (maks SQS)
))
.build();
CreateQueueResponse response = sqsClient.createQueue(request);
System.out.println("Queue dibuat: " + response.queueUrl());
return response.queueUrl();
}
// Membuat FIFO Queue
public String createFifoQueue(String queueName) {
// Nama FIFO queue harus diakhiri dengan .fifo
if (!queueName.endsWith(".fifo")) {
queueName = queueName + ".fifo";
}
CreateQueueRequest request = CreateQueueRequest.builder()
.queueName(queueName)
.attributes(Map.of(
QueueAttributeName.FIFO_QUEUE, "true",
QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false", // gunakan deduplication ID eksplisit
QueueAttributeName.VISIBILITY_TIMEOUT, "30",
QueueAttributeName.MESSAGE_RETENTION_PERIOD, "86400"
))
.build();
CreateQueueResponse response = sqsClient.createQueue(request);
return response.queueUrl();
}
// Membuat Standard Queue dengan Dead Letter Queue
public String createQueueWithDLQ(String mainQueueName, String dlqName) {
// 1. Buat DLQ terlebih dahulu
String dlqUrl = createStandardQueue(dlqName);
// 2. Ambil ARN dari DLQ
GetQueueAttributesResponse dlqAttrs = sqsClient.getQueueAttributes(
GetQueueAttributesRequest.builder()
.queueUrl(dlqUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build()
);
String dlqArn = dlqAttrs.attributes().get(QueueAttributeName.QUEUE_ARN);
// 3. Buat main queue dengan redrive policy yang mengarah ke DLQ
String redrivePolicy = String.format(
"{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"3\"}",
dlqArn
);
// maxReceiveCount=3 → pesan dikirim ke DLQ setelah gagal diproses 3 kali
CreateQueueRequest request = CreateQueueRequest.builder()
.queueName(mainQueueName)
.attributes(Map.of(
QueueAttributeName.VISIBILITY_TIMEOUT, "30",
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20",
QueueAttributeName.REDRIVE_POLICY, redrivePolicy
))
.build();
CreateQueueResponse response = sqsClient.createQueue(request);
System.out.println("Main queue: " + response.queueUrl());
System.out.println("DLQ: " + dlqUrl);
return response.queueUrl();
}
// Mendapatkan URL queue berdasarkan nama
public String getQueueUrl(String queueName) {
return sqsClient.getQueueUrl(
GetQueueUrlRequest.builder().queueName(queueName).build()
).queueUrl();
}
}
Mengirim Pesan (Producer) #
Mengirim Pesan Tunggal #
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
public class SqsProducer {
private final SqsClient sqsClient;
private final String queueUrl;
public SqsProducer(SqsClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}
// Mengirim pesan sederhana ke Standard Queue
public String send(String messageBody) {
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(messageBody)
.delaySeconds(0) // delay pengiriman (0-900 detik), default 0
.messageAttributes(Map.of(
"source-service", MessageAttributeValue.builder()
.dataType("String")
.stringValue("order-service")
.build(),
"event-type", MessageAttributeValue.builder()
.dataType("String")
.stringValue("order.created")
.build(),
"retry-count", MessageAttributeValue.builder()
.dataType("Number")
.stringValue("0")
.build()
))
.build();
SendMessageResponse response = sqsClient.sendMessage(request);
System.out.println("Pesan terkirim — MessageId: " + response.messageId());
return response.messageId();
}
// Mengirim ke FIFO Queue — butuh MessageGroupId dan MessageDeduplicationId
public String sendToFifo(String messageBody, String messageGroupId, String deduplicationId) {
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(messageBody)
// MessageGroupId — semua pesan dalam grup yang sama diproses urut
.messageGroupId(messageGroupId)
// MessageDeduplicationId — SQS tolak pesan duplikat dalam window 5 menit
// Gunakan nilai yang unik per pesan (misal: UUID atau hash dari konten)
.messageDeduplicationId(deduplicationId)
.build();
SendMessageResponse response = sqsClient.sendMessage(request);
return response.messageId();
}
}
Batch Send — Menghemat Biaya #
SQS menagih per request API, bukan per pesan. Mengirim 10 pesan dalam satu batch request sama biayanya dengan mengirim 1 pesan. Selalu gunakan batch jika memungkinkan.
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class SqsBatchProducer {
private final SqsClient sqsClient;
private final String queueUrl;
public SqsBatchProducer(SqsClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}
// ✗ ANTI-PATTERN: kirim satu per satu dalam loop
// 100 pesan = 100 API call = 100x biaya
public void sendOneByOne(List<String> messages) {
for (String msg : messages) {
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(msg)
.build());
}
}
// ✓ BENAR: batch send — maks 10 pesan per batch request
// 100 pesan = 10 API call = 10x lebih hemat
public void sendBatch(List<String> messages) {
// SQS membatasi 10 pesan per batch
int batchSize = 10;
for (int i = 0; i < messages.size(); i += batchSize) {
List<String> batch = messages.subList(i, Math.min(i + batchSize, messages.size()));
sendSingleBatch(batch);
}
}
private void sendSingleBatch(List<String> batch) {
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
for (int i = 0; i < batch.size(); i++) {
entries.add(SendMessageBatchRequestEntry.builder()
.id(String.valueOf(i)) // ID unik dalam batch (bukan message ID global)
.messageBody(batch.get(i))
.build());
}
SendMessageBatchResponse response = sqsClient.sendMessageBatch(
SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build()
);
// Periksa pesan yang gagal terkirim dalam batch
if (!response.failed().isEmpty()) {
response.failed().forEach(failure -> {
System.err.printf(
"Gagal kirim pesan ID=%s: %s — %s%n",
failure.id(), failure.code(), failure.message()
);
// Di sini: retry pesan yang gagal atau kirim ke fallback
});
}
System.out.printf("Batch terkirim: %d berhasil, %d gagal%n",
response.successful().size(), response.failed().size());
}
}
Menerima dan Memproses Pesan (Consumer) #
Long Polling #
SQS mendukung dua mode polling: short polling dan long polling. Selalu gunakan long polling.
SHORT POLLING (WaitTimeSeconds=0):
Consumer → SQS: "Ada pesan?"
SQS → Consumer: "Tidak ada" (meski pesan mungkin sudah ada)
[ulangi setiap detik]
Akibat: banyak empty response, biaya tinggi, latensi tidak tentu
LONG POLLING (WaitTimeSeconds=1-20):
Consumer → SQS: "Ada pesan? Tunggu maks 20 detik"
SQS: [menunggu sampai ada pesan atau timeout]
SQS → Consumer: pesan (segera setelah tersedia)
Akibat: lebih sedikit empty response, biaya turun, latensi lebih rendah
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
public class SqsConsumer {
private final SqsClient sqsClient;
private final String queueUrl;
private volatile boolean running = true;
public SqsConsumer(SqsClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}
public void start() {
System.out.println("Consumer mulai berjalan...");
while (running) {
try {
// ✓ Long polling — tunggu maks 20 detik jika tidak ada pesan
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10) // maks 10 pesan per receive (maks SQS)
.waitTimeSeconds(20) // long polling
.visibilityTimeout(30) // consumer punya 30 detik untuk proses
.messageAttributeNames("All") // ambil semua message attributes
.attributeNames( // ambil queue attributes (ApproximateReceiveCount, dll)
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT,
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP
)
.build();
ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
List<Message> messages = response.messages();
if (messages.isEmpty()) {
continue; // tidak ada pesan, poll lagi
}
for (Message message : messages) {
processAndDelete(message);
}
} catch (Exception e) {
System.err.println("Error saat polling: " + e.getMessage());
// jangan crash — tunggu sebentar lalu coba lagi
sleep(5000);
}
}
}
private void processAndDelete(Message message) {
try {
System.out.printf("Memproses MessageId=%s: %s%n",
message.messageId(), message.body());
// Periksa berapa kali pesan ini sudah diterima
String receiveCount = message.attributes().get(
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT
);
System.out.println("Sudah diterima " + receiveCount + " kali");
doProcess(message.body());
// ✓ Hapus pesan setelah berhasil diproses
// Gunakan ReceiptHandle (bukan MessageId) untuk delete
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
System.out.println("Pesan berhasil diproses dan dihapus.");
} catch (Exception e) {
System.err.printf("Gagal memproses MessageId=%s: %s%n",
message.messageId(), e.getMessage());
// Jangan hapus pesan — biarkan visibility timeout habis
// SQS akan mengirimkan pesan ini kembali ke consumer lain
// Setelah maxReceiveCount tercapai, otomatis masuk DLQ
}
}
private void doProcess(String body) throws Exception {
// logika bisnis
}
public void stop() {
running = false;
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Memperpanjang Visibility Timeout #
Jika kamu tahu pemrosesan akan memakan waktu lebih lama dari visibility timeout, perpanjang sebelum habis:
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
public class LongRunningProcessor {
private final SqsClient sqsClient;
private final String queueUrl;
public LongRunningProcessor(SqsClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}
public void processHeavyTask(Message message) throws Exception {
// Jalankan timer untuk perpanjang visibility timeout setiap 25 detik
// (visibility timeout awal = 30 detik, perpanjang 5 detik sebelum habis)
java.util.concurrent.ScheduledExecutorService scheduler =
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try {
sqsClient.changeMessageVisibility(
ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.visibilityTimeout(30) // perpanjang 30 detik lagi
.build()
);
System.out.println("Visibility timeout diperpanjang.");
} catch (Exception e) {
System.err.println("Gagal perpanjang visibility timeout: " + e.getMessage());
}
}, 25, 25, java.util.concurrent.TimeUnit.SECONDS);
try {
// Lakukan proses berat yang memakan waktu lama
doHeavyWork(message.body());
// Hapus pesan setelah selesai
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
} finally {
scheduler.shutdown();
}
}
private void doHeavyWork(String body) throws Exception {
// simulasi proses berat — misal: encode video, generate laporan besar
Thread.sleep(60_000); // 1 menit
}
}
Batch Delete — Menghemat Biaya saat Delete #
Sama seperti batch send, delete juga bisa dilakukan dalam batch:
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
public class SqsBatchConsumer {
private final SqsClient sqsClient;
private final String queueUrl;
public SqsBatchConsumer(SqsClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}
public void processAndDeleteBatch() {
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(20)
.build();
List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages();
if (messages.isEmpty()) return;
List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
List<String> failed = new ArrayList<>();
for (Message message : messages) {
try {
doProcess(message.body());
// Kumpulkan receipt handle untuk batch delete
toDelete.add(DeleteMessageBatchRequestEntry.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.build());
} catch (Exception e) {
failed.add(message.messageId());
System.err.println("Gagal proses: " + message.messageId());
}
}
// ✓ Batch delete semua pesan yang berhasil dalam satu API call
if (!toDelete.isEmpty()) {
var deleteResponse = sqsClient.deleteMessageBatch(
DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(toDelete)
.build()
);
if (!deleteResponse.failed().isEmpty()) {
deleteResponse.failed().forEach(f ->
System.err.println("Gagal delete MessageId=" + f.id() + ": " + f.message())
);
}
}
System.out.printf("Batch selesai: %d diproses, %d gagal%n",
toDelete.size(), failed.size());
}
private void doProcess(String body) throws Exception {
// logika bisnis
}
}
Dead Letter Queue #
SQS punya mekanisme DLQ bawaan melalui Redrive Policy. Ketika pesan sudah diterima lebih dari maxReceiveCount kali tanpa berhasil dihapus, SQS otomatis memindahkannya ke DLQ.
flowchart TD
P[Producer] --> Q[order-queue\nmaxReceiveCount=3]
Q --> C{Consumer\nberhasil proses?}
C -- Ya\nDeleteMessage --> DONE[Pesan dihapus]
C -- Tidak\nTimeout / Error --> Q
Q -->|Sudah 3x diterima| DLQ[dlq-order-queue]
DLQ --> MON[DLQ Monitor\nAlert & Investigasi]
MON -->|Setelah diperbaiki| REDRIVE[Redrive ke\nmain queue]Memonitor dan memproses ulang pesan dari DLQ:
public class DlqMonitor {
private final SqsClient sqsClient;
private final String dlqUrl;
private final String mainQueueUrl;
public DlqMonitor(SqsClient sqsClient, String dlqUrl, String mainQueueUrl) {
this.sqsClient = sqsClient;
this.dlqUrl = dlqUrl;
this.mainQueueUrl = mainQueueUrl;
}
// Periksa jumlah pesan di DLQ
public int getDlqMessageCount() {
GetQueueAttributesResponse response = sqsClient.getQueueAttributes(
GetQueueAttributesRequest.builder()
.queueUrl(dlqUrl)
.attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
.build()
);
return Integer.parseInt(
response.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
);
}
// Baca pesan dari DLQ untuk investigasi
public void inspectDlq() {
ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.queueUrl(dlqUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(1) // short poll untuk inspeksi manual
.attributeNames(
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT,
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP
)
.build();
List<Message> messages = sqsClient.receiveMessage(request).messages();
System.out.println("Pesan di DLQ: " + messages.size());
for (Message msg : messages) {
System.out.printf(
"MessageId=%s | Diterima %s kali | Body: %s%n",
msg.messageId(),
msg.attributes().get(
software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT
),
msg.body()
);
}
}
// Redrive — pindahkan pesan dari DLQ kembali ke main queue setelah bug diperbaiki
// AWS Console menyediakan fitur "Start DLQ redrive" secara built-in,
// tapi ini implementasi manual jika butuh kontrol lebih:
public void redriveMessages() {
List<Message> messages;
int redrived = 0;
do {
messages = sqsClient.receiveMessage(
ReceiveMessageRequest.builder()
.queueUrl(dlqUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(1)
.build()
).messages();
for (Message msg : messages) {
// Kirim ulang ke main queue
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(mainQueueUrl)
.messageBody(msg.body())
.build());
// Hapus dari DLQ
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(dlqUrl)
.receiptHandle(msg.receiptHandle())
.build());
redrived++;
}
} while (!messages.isEmpty());
System.out.println("Total pesan di-redrive: " + redrived);
}
}
Integrasi SNS + SQS — Fan-out Pattern #
SQS sendiri tidak mendukung fan-out (satu pesan ke banyak consumer). Untuk itu, kombinasikan dengan Amazon SNS (Simple Notification Service). SNS bertindak sebagai publisher yang mendistribusikan pesan ke semua SQS queue yang berlangganan.
flowchart TD
P[Producer] --> SNS[SNS Topic\norder-events]
SNS --> Q1[SQS Queue\nanalytics-queue]
SNS --> Q2[SQS Queue\nnotification-queue]
SNS --> Q3[SQS Queue\naudit-queue]
Q1 --> C1[Analytics Service]
Q2 --> C2[Notification Service]
Q3 --> C3[Audit Service]import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;
public class SnsSqsFanout {
private final SnsClient snsClient;
private final SqsClient sqsClient;
public SnsSqsFanout(SnsClient snsClient, SqsClient sqsClient) {
this.snsClient = snsClient;
this.sqsClient = sqsClient;
}
public void setupFanout(String topicName, List<String> queueArns) {
// 1. Buat SNS Topic
CreateTopicResponse topicResponse = snsClient.createTopic(
CreateTopicRequest.builder().name(topicName).build()
);
String topicArn = topicResponse.topicArn();
// 2. Subscribe setiap SQS queue ke SNS topic
for (String queueArn : queueArns) {
snsClient.subscribe(SubscribeRequest.builder()
.topicArn(topicArn)
.protocol("sqs")
.endpoint(queueArn)
.build());
}
System.out.println("Fan-out setup selesai. Topic ARN: " + topicArn);
}
// Publish ke SNS — otomatis dikirim ke semua SQS queue yang subscribe
public void publish(String topicArn, String message) {
snsClient.publish(PublishRequest.builder()
.topicArn(topicArn)
.message(message)
.subject("order-event")
.build());
}
}
Ketika SQS menerima pesan dari SNS, body pesan dibungkus dalam JSON envelope SNS. Consumer SQS perlu mem-parse envelope tersebut untuk mendapatkan pesan aslinya. Gunakan atributMessagedi dalam JSON SNS notification, bukanbodylangsung dari SQS message.
Menangani Pesan Duplikat #
Standard Queue bisa mengirimkan pesan lebih dari sekali (at-least-once delivery). Consumer harus idempoten — memproses pesan yang sama dua kali tidak boleh menghasilkan efek yang berbeda.
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class IdempotentConsumer {
// Di production: gunakan Redis atau DynamoDB untuk menyimpan processed IDs
// dengan TTL yang sesuai retention period queue
private final Set<String> processedIds = ConcurrentHashMap.newKeySet();
public void processIdempotent(Message message) throws Exception {
String messageId = message.messageId();
// ✗ ANTI-PATTERN: langsung proses tanpa cek duplikat
// doProcess(message.body()); // bisa dieksekusi dua kali!
// ✓ BENAR: cek apakah sudah pernah diproses
if (processedIds.contains(messageId)) {
System.out.println("Duplikat diabaikan: " + messageId);
return; // langsung return, tapi tetap delete nanti
}
// Proses pesan
doProcess(message.body());
// Tandai sebagai sudah diproses
processedIds.add(messageId);
}
private void doProcess(String body) throws Exception {
System.out.println("Memproses: " + body);
// logika bisnis yang menghasilkan efek (write ke DB, kirim email, dll)
}
}
Untuk production, gunakan DynamoDB sebagai idempotency store:
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.util.Map;
public class DynamoDbIdempotencyStore {
private final DynamoDbClient dynamoDb;
private static final String TABLE = "sqs-processed-messages";
public DynamoDbIdempotencyStore(DynamoDbClient dynamoDb) {
this.dynamoDb = dynamoDb;
}
// Coba tandai pesan sebagai "sedang diproses"
// Menggunakan conditional write — hanya berhasil jika messageId belum ada
public boolean tryMarkAsProcessing(String messageId) {
try {
dynamoDb.putItem(PutItemRequest.builder()
.tableName(TABLE)
.item(Map.of(
"messageId", AttributeValue.fromS(messageId),
"status", AttributeValue.fromS("PROCESSING"),
"ttl", AttributeValue.fromN( // hapus otomatis setelah 24 jam
String.valueOf(System.currentTimeMillis() / 1000 + 86400)
)
))
.conditionExpression("attribute_not_exists(messageId)")
.build());
return true; // berhasil — pesan ini belum pernah diproses
} catch (software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException e) {
return false; // sudah pernah diproses — ini duplikat
}
}
public void markAsCompleted(String messageId) {
dynamoDb.updateItem(UpdateItemRequest.builder()
.tableName(TABLE)
.key(Map.of("messageId", AttributeValue.fromS(messageId)))
.updateExpression("SET #s = :completed")
.expressionAttributeNames(Map.of("#s", "status"))
.expressionAttributeValues(Map.of(":completed", AttributeValue.fromS("COMPLETED")))
.build());
}
}
Kapan Menggunakan SQS dan Kapan Tidak #
GUNAKAN SQS JIKA:
✓ Sudah di ekosistem AWS dan ingin layanan terkelola penuh
✓ Tidak mau repot maintain broker — SQS zero ops
✓ Task queue sederhana tanpa routing kompleks
✓ Volume yang sangat variatif — SQS auto-scale tanpa konfigurasi
✓ Butuh integrasi mudah dengan Lambda, SNS, S3, dan layanan AWS lain
✓ Biaya perlu dikontrol — bayar per request, tidak ada biaya idle
✓ Butuh FIFO dengan exactly-once delivery yang mudah dikonfigurasi
PERTIMBANGKAN ALTERNATIF JIKA:
✗ Butuh routing kompleks (exchange pattern) → RabbitMQ lebih fleksibel
✗ Volume sangat tinggi (jutaan msg/s) dengan latensi ultra-rendah → Kafka
✗ Butuh replay pesan yang sudah diproses → Kafka dengan retention log
✗ Tidak di AWS dan tidak mau vendor lock-in → RabbitMQ atau Kafka
✗ Butuh streaming dan agregasi real-time → Kafka Streams atau Kinesis
✗ Pesan lebih besar dari 256 KB → SQS tidak support, perlu S3 + SQS pointer
Ringkasan #
- Visibility timeout adalah konsep inti SQS — pesan tidak dihapus saat diterima, hanya menjadi tidak terlihat. Hapus pesan secara eksplisit dengan
DeleteMessagesetelah berhasil diproses.- Standard Queue untuk throughput tinggi dengan toleransi duplikat; FIFO Queue untuk ordering ketat dan exactly-once delivery — harga lebih mahal dan throughput lebih terbatas.
- Selalu gunakan long polling (
WaitTimeSeconds=20) untuk mengurangi empty response, menghemat biaya API, dan menurunkan latensi dibanding short polling.- Batch send dan batch delete menghemat biaya hingga 10x karena SQS menagih per request API, bukan per pesan. Maksimal 10 pesan per batch.
- Dead Letter Queue dikonfigurasi melalui Redrive Policy di level queue — pesan otomatis masuk DLQ setelah gagal diproses sebanyak
maxReceiveCountkali.- Perpanjang visibility timeout (
ChangeMessageVisibility) jika proses memakan waktu lebih lama dari timeout awal — ini mencegah pesan dikirim ulang ke consumer lain saat masih diproses.- Standard Queue bersifat at-least-once — consumer harus idempoten. Gunakan DynamoDB dengan conditional write sebagai idempotency store di production.
- SNS + SQS untuk fan-out pattern — SNS mendistribusikan satu pesan ke banyak SQS queue secara paralel, memberikan kemampuan yang tidak dimiliki SQS sendirian.
- SQS paling cocok untuk ekosistem AWS, zero-ops task queue, dan volume yang sangat variatif — bukan untuk routing kompleks, replay, atau streaming real-time.