Google Cloud Pub/Sub #
Di antara semua managed message broker yang tersedia di cloud, Google Cloud Pub/Sub memiliki posisi yang unik: ia dirancang dari awal untuk skala global, bukan skala regional. Satu topic Pub/Sub bisa menerima dan mendistribusikan pesan ke subscriber di belahan dunia mana pun tanpa kamu perlu memikirkan replikasi atau geo-routing. Ini menjadikannya pilihan alami untuk aplikasi yang dibangun di atas Google Cloud Platform — mulai dari pipeline data analytics yang terintegrasi erat dengan BigQuery, hingga event-driven architecture yang menghubungkan Cloud Run, Cloud Functions, dan Dataflow. Secara model, Pub/Sub lebih mirip dengan kombinasi SNS dan SQS di AWS: satu topic bisa punya banyak subscription, dan setiap subscription bertindak seperti queue tersendiri yang menerima salinan semua pesan dari topic.
Arsitektur Pub/Sub #
Memahami model Pub/Sub sangat penting sebelum menulis kode, karena beberapa konsepnya terasa berlawanan intuisi jika kamu terbiasa dengan broker lain.
Topic dan Subscription #
Topic adalah resource tempat publisher mengirim pesan. Topic sendiri tidak menyimpan pesan — ia hanya jalur distribusi.
Subscription adalah resource yang melekat pada topic dan benar-benar menyimpan pesan sampai di-acknowledge atau sampai retention period habis. Satu topic bisa punya banyak subscription, dan setiap subscription menerima salinan semua pesan secara independen — persis seperti consumer group di Kafka, tapi dengan cara kerja yang berbeda.
flowchart LR
PUB[Publisher] --> T[Topic:\norder-events]
T --> S1[Subscription:\nanalytics-sub]
T --> S2[Subscription:\nnotification-sub]
T --> S3[Subscription:\naudit-sub]
S1 --> C1[Analytics Service\nPull subscriber]
S2 --> C2[Notification Service\nPull subscriber]
S3 --> WH[Webhook endpoint\nPush subscriber]Poin kritis yang sering membingungkan: jika belum ada subscription ketika pesan diterbitkan, pesan tersebut hilang. Pub/Sub tidak menyimpan pesan di level topic, hanya di level subscription. Buat subscription sebelum publisher mulai mengirim pesan.
Push vs Pull Delivery #
Pub/Sub mendukung dua mekanisme pengiriman pesan ke subscriber:
Pull — subscriber yang aktif meminta pesan ke Pub/Sub. Mirip seperti long polling di SQS. Cocok untuk consumer yang berjalan sebagai long-running process (Kubernetes pod, VM, Cloud Run yang selalu aktif).
Push — Pub/Sub yang aktif mengirim pesan ke endpoint HTTPS milik subscriber. Cocok untuk Cloud Functions, Cloud Run (serverless), atau webhook. Pub/Sub retry otomatis jika endpoint mengembalikan non-2xx response.
flowchart TD
T[Topic]
subgraph PULL[Pull Subscription]
S1[Subscription] -->|subscriber tarik pesan| C1[Consumer\nlong-running service]
end
subgraph PUSH[Push Subscription]
S2[Subscription] -->|Pub/Sub dorong pesan| EP[HTTPS Endpoint\nCloud Function / webhook]
end
T --> S1
T --> S2| Pull | Push | |
|---|---|---|
| Siapa yang inisiasi | Subscriber | Pub/Sub |
| Cocok untuk | Long-running service | Serverless, webhook |
| Kontrol laju | Subscriber kontrol sendiri | Pub/Sub kontrol (max burst rate) |
| Autentikasi | IAM di sisi subscriber | OIDC token di header request |
| Operasional | Perlu manage polling loop | Zero ops di sisi consumer |
Acknowledgment Deadline #
Mirip dengan visibility timeout di SQS. Ketika subscriber menerima pesan melalui pull, ia punya waktu sebesar acknowledgment deadline (default 10 detik, maks 600 detik) untuk mengirimkan acknowledge. Jika tidak, Pub/Sub mengirimkan pesan kembali.
Pesan diterima subscriber
│
├── dalam ack deadline → subscriber kirim ack → pesan dihapus dari subscription
│
└── melebihi ack deadline → Pub/Sub kirim ulang ke subscriber lain (atau subscriber yang sama)
Setup Dependencies #
Tambahkan Google Cloud Pub/Sub client library ke project Java kamu.
Untuk Maven:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.37.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Google Cloud Pub/Sub -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
</dependencies>
Untuk Gradle:
dependencies {
implementation platform('com.google.cloud:libraries-bom:26.37.0')
implementation 'com.google.cloud:google-cloud-pubsub'
}
Untuk development lokal, gunakan Pub/Sub Emulator:
# Install Google Cloud SDK jika belum ada
# Kemudian jalankan emulator
gcloud beta emulators pubsub start --project=my-project --host-port=localhost:8085
# Set environment variable agar client library mengarah ke emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
export GOOGLE_CLOUD_PROJECT=my-project
Atau gunakan Docker:
docker run -d \
--name pubsub-emulator \
-p 8085:8085 \
gcr.io/google.com/cloudsdktool/google-cloud-cli \
gcloud beta emulators pubsub start \
--project=my-project \
--host-port=0.0.0.0:8085
Autentikasi #
Pub/Sub menggunakan Google Cloud IAM untuk autentikasi. Client library secara otomatis mencari Application Default Credentials (ADC) dari beberapa sumber secara berurutan:
1. Environment variable GOOGLE_APPLICATION_CREDENTIALS → path ke service account JSON
2. gcloud auth application-default login → credentials dari gcloud CLI
3. Metadata server → jika berjalan di GCE, GKE, Cloud Run, atau Cloud Functions (IAM role)
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
public class PubSubAuth {
// ✓ BENAR: Application Default Credentials — otomatis di GCP, gcloud untuk dev
// Tidak perlu kode tambahan — library menanganinya sendiri
// Cukup set GOOGLE_APPLICATION_CREDENTIALS atau jalankan gcloud auth
// ✓ BENAR: Service account eksplisit jika butuh credentials tertentu
public static GoogleCredentials loadServiceAccount(String keyFilePath) throws Exception {
try (FileInputStream stream = new FileInputStream(keyFilePath)) {
return ServiceAccountCredentials.fromStream(stream)
.createScoped("https://www.googleapis.com/auth/cloud-platform");
}
}
// ✗ ANTI-PATTERN: hardcode service account key di kode sumber
// Key bisa bocor ke version control atau log aplikasi
public static final String HARDCODED_KEY = "{ \"type\": \"service_account\", ... }";
}
Jangan commit service account JSON key ke version control. Gunakan Secret Manager, environment variable yang diinjeksikan saat deploy, atau Workload Identity (untuk GKE) yang menghilangkan kebutuhan service account key sama sekali.
Manajemen Topic dan Subscription #
Pub/Sub menyediakan TopicAdminClient dan SubscriptionAdminClient untuk manajemen resource secara programatis.
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.*;
import org.threeten.bp.Duration;
public class PubSubResourceManager {
private final String projectId;
public PubSubResourceManager(String projectId) {
this.projectId = projectId;
}
// Membuat topic
public void createTopic(String topicId) throws Exception {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
Topic topic = Topic.newBuilder()
.setName(topicName.toString())
// Retention pesan di topic (bukan subscription) — 1 hari
// Ini memungkinkan subscription yang dibuat setelah pesan diterbitkan
// untuk mengakses pesan dalam window ini (seek to timestamp)
.setMessageRetentionDuration(
com.google.protobuf.Duration.newBuilder()
.setSeconds(86400) // 24 jam
.build()
)
.build();
topicAdminClient.createTopic(topic);
System.out.println("Topic dibuat: " + topicName);
}
}
// Membuat Pull Subscription
public void createPullSubscription(String topicId, String subscriptionId) throws Exception {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
Subscription subscription = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setAckDeadlineSeconds(60) // subscriber punya 60 detik untuk ack
.setRetainAckedMessages(false) // hapus pesan yang sudah di-ack
.setMessageRetentionDuration(
com.google.protobuf.Duration.newBuilder()
.setSeconds(7 * 86400) // simpan pesan yang belum di-ack selama 7 hari
.build()
)
.build();
subscriptionAdminClient.createSubscription(subscription);
System.out.println("Pull subscription dibuat: " + subscriptionName);
}
}
// Membuat Push Subscription
public void createPushSubscription(String topicId, String subscriptionId,
String pushEndpoint) throws Exception {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
PushConfig pushConfig = PushConfig.newBuilder()
.setPushEndpoint(pushEndpoint) // harus HTTPS dan bisa diakses publik
.build();
Subscription subscription = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setPushConfig(pushConfig)
.setAckDeadlineSeconds(30)
.build();
subscriptionAdminClient.createSubscription(subscription);
System.out.println("Push subscription dibuat: " + subscriptionName);
}
}
// Membuat Subscription dengan Dead Letter Topic
public void createSubscriptionWithDLT(String topicId, String subscriptionId,
String deadLetterTopicId) throws Exception {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder()
.setDeadLetterTopic(deadLetterTopicName.toString())
.setMaxDeliveryAttempts(5) // kirim ke DLT setelah 5 kali gagal
.build();
Subscription subscription = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setDeadLetterPolicy(deadLetterPolicy)
.setAckDeadlineSeconds(60)
.build();
subscriptionAdminClient.createSubscription(subscription);
System.out.println("Subscription dengan DLT dibuat: " + subscriptionName);
}
}
}
Publisher #
Publisher menggunakan Publisher client yang mendukung pengiriman asynchronous dengan batching otomatis.
Konfigurasi Publisher #
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import org.threeten.bp.Duration;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PubSubPublisher {
private final Publisher publisher;
public PubSubPublisher(String projectId, String topicId) throws Exception {
TopicName topicName = TopicName.of(projectId, topicId);
// Konfigurasi batching — library mengumpulkan pesan sebelum dikirim
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
.setElementCountThreshold(100L) // kirim jika sudah 100 pesan
.setRequestByteThreshold(1024 * 1024L) // atau jika total sudah 1 MB
.setDelayThreshold(Duration.ofMillis(100)) // atau jika sudah 100ms sejak pesan pertama
.build();
// Konfigurasi retry untuk transient error
RetrySettings retrySettings = RetrySettings.newBuilder()
.setMaxAttempts(5)
.setInitialRetryDelay(Duration.ofMillis(100))
.setMaxRetryDelay(Duration.ofSeconds(60))
.setRetryDelayMultiplier(2.0)
.build();
this.publisher = Publisher.newBuilder(topicName)
.setBatchingSettings(batchingSettings)
.setRetrySettings(retrySettings)
.build();
}
// Publish pesan dengan attributes (metadata)
public void publish(String data, Map<String, String> attributes) {
ByteString byteData = ByteString.copyFromUtf8(data);
PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder()
.setData(byteData);
// Tambahkan attributes — key-value metadata yang bisa digunakan untuk filtering
if (attributes != null) {
messageBuilder.putAllAttributes(attributes);
}
PubsubMessage message = messageBuilder.build();
// publish() mengembalikan ApiFuture — non-blocking
ApiFuture<String> future = publisher.publish(message);
// Daftarkan callback untuk mengetahui hasilnya
ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
@Override
public void onSuccess(String messageId) {
System.out.println("Pesan berhasil diterbitkan — MessageId: " + messageId);
}
@Override
public void onFailure(Throwable throwable) {
System.err.println("Gagal menerbitkan pesan: " + throwable.getMessage());
// Di sini: log error, kirim ke fallback, atau alert monitoring
}
}, Executors.newSingleThreadExecutor());
}
// Publish dengan ordering key — membutuhkan message ordering diaktifkan di subscription
public void publishOrdered(String data, String orderingKey) {
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(data))
.setOrderingKey(orderingKey) // semua pesan dengan key yang sama dikirim urut
.build();
publisher.publish(message);
}
// Wajib dipanggil saat aplikasi shutdown
// Publisher flush semua pesan yang masih di buffer sebelum berhenti
public void shutdown() throws Exception {
publisher.shutdown();
publisher.awaitTermination(30, TimeUnit.SECONDS);
}
}
Publish Batch Eksplisit #
Meski batching dilakukan otomatis oleh library, kamu bisa publish banyak pesan sekaligus dan menunggu semua selesai:
import java.util.ArrayList;
import java.util.List;
public class BatchPublisher {
private final Publisher publisher;
public BatchPublisher(Publisher publisher) {
this.publisher = publisher;
}
public void publishBatch(List<String> messages) throws Exception {
List<ApiFuture<String>> futures = new ArrayList<>();
for (String message : messages) {
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(message))
.build();
futures.add(publisher.publish(pubsubMessage));
}
// Tunggu semua pesan selesai diterbitkan
// ApiFutures.allAsList() gagal jika ada satu pun yang gagal
try {
List<String> messageIds = ApiFutures.allAsList(futures).get();
System.out.printf("Berhasil publish %d pesan%n", messageIds.size());
} catch (Exception e) {
System.err.println("Satu atau lebih pesan gagal diterbitkan: " + e.getMessage());
throw e;
}
}
}
Subscriber — Pull #
Pull subscriber menggunakan Subscriber client yang menangani polling, threading, dan ack secara otomatis di background.
Synchronous Pull #
Gunakan pull synchronous untuk skenario di mana kamu butuh kontrol penuh atas kapan pesan diproses — misalnya pipeline batch yang hanya berjalan pada waktu tertentu:
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.*;
public class SynchronousPullSubscriber {
private final String projectId;
private final String subscriptionId;
public SynchronousPullSubscriber(String projectId, String subscriptionId) {
this.projectId = projectId;
this.subscriptionId = subscriptionId;
}
public void pullAndProcess(int maxMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20 MB
.build()
)
.build();
try (GrpcSubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = SubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessages)
.setSubscription(subscriptionName)
.build();
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage receivedMessage : pullResponse.getReceivedMessagesList()) {
PubsubMessage message = receivedMessage.getMessage();
String data = message.getData().toStringUtf8();
System.out.printf("MessageId=%s | Data: %s%n",
message.getMessageId(), data);
System.out.println("Attributes: " + message.getAttributesMap());
try {
processMessage(data, message.getAttributesMap());
ackIds.add(receivedMessage.getAckId());
} catch (Exception e) {
System.err.println("Gagal proses, tidak akan di-ack: " + e.getMessage());
// Jangan tambahkan ke ackIds — Pub/Sub akan kirim ulang setelah deadline
}
}
// Acknowledge semua pesan yang berhasil diproses dalam satu request
if (!ackIds.isEmpty()) {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println("Acknowledged " + ackIds.size() + " pesan.");
}
}
}
private void processMessage(String data, Map<String, String> attributes) throws Exception {
// logika bisnis
}
}
Asynchronous Pull (Streaming) #
Untuk long-running consumer, gunakan streaming pull dengan Subscriber — ini adalah cara yang paling umum dan efisien:
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class StreamingPullSubscriber {
public static void startSubscriber(String projectId, String subscriptionId) throws Exception {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// MessageReceiver dipanggil di thread terpisah untuk setiap pesan
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
String data = message.getData().toStringUtf8();
String messageId = message.getMessageId();
Map<String, String> attributes = message.getAttributesMap();
System.out.printf("Menerima pesan — MessageId=%s%n", messageId);
try {
processMessage(data, attributes);
// ✓ Ack — Pub/Sub hapus pesan dari subscription
consumer.ack();
System.out.println("Pesan di-ack: " + messageId);
} catch (Exception e) {
System.err.printf("Gagal proses MessageId=%s: %s%n", messageId, e.getMessage());
// ✓ Nack — Pub/Sub kirim ulang setelah ack deadline
// Jika subscription punya Dead Letter Policy, pesan akan masuk DLT
// setelah maxDeliveryAttempts tercapai
consumer.nack();
}
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setMaxAckExtensionPeriod(Duration.ofSeconds(120)) // perpanjang ack deadline otomatis
.setParallelPullCount(2) // jumlah streaming pull paralel
.setExecutorProvider( // thread pool untuk memproses pesan
com.google.api.gax.core.InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(4)
.build()
)
.build();
// Daftarkan listener untuk error fatal
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println("Subscriber gagal dari state " + from + ": " + failure.getMessage());
// Alert ke monitoring system
}
}, Executors.newSingleThreadExecutor());
subscriber.startAsync().awaitRunning();
System.out.println("Subscriber berjalan, menunggu pesan...");
// Jalankan selamanya (atau sampai signal shutdown)
try {
subscriber.awaitTerminated(30, TimeUnit.MINUTES);
} catch (TimeoutException e) {
subscriber.stopAsync();
}
}
private static void processMessage(String data, Map<String, String> attributes) throws Exception {
System.out.println("Memproses: " + data);
// logika bisnis
}
}
Memperpanjang Acknowledgment Deadline #
Jika pemrosesan membutuhkan waktu lebih lama dari ack deadline, gunakan ModifyAckDeadline untuk memperpanjang:
// Dengan Subscriber async, library menangani ini otomatis melalui setMaxAckExtensionPeriod
// Untuk synchronous pull, kamu perlu melakukannya secara manual:
public void extendDeadline(GrpcSubscriberStub subscriber,
String subscriptionName,
List<String> ackIds,
int newDeadlineSeconds) {
ModifyAckDeadlineRequest modifyRequest = ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.setAckDeadlineSeconds(newDeadlineSeconds) // perpanjang hingga N detik dari sekarang
.build();
subscriber.modifyAckDeadlineCallable().call(modifyRequest);
System.out.println("Ack deadline diperpanjang " + newDeadlineSeconds + " detik.");
}
Filtering Pesan #
Pub/Sub mendukung filter berbasis CEL (Common Expression Language) di level subscription. Subscriber hanya menerima pesan yang cocok dengan filter — pesan yang tidak cocok di-acknowledge otomatis oleh Pub/Sub (tidak masuk ke subscription).
// Filter hanya pesan dengan attribute "event-type" bernilai "order.created"
// atau "order.shipped"
String filter = "attributes.\"event-type\" = \"order.created\" " +
"OR attributes.\"event-type\" = \"order.shipped\"";
Subscription subscription = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setFilter(filter) // filter diterapkan di sisi Pub/Sub, bukan di subscriber
.setAckDeadlineSeconds(60)
.build();
subscriptionAdminClient.createSubscription(subscription);
Contoh filter lain yang umum digunakan:
// Filter berdasarkan satu attribute
attributes.region = "asia-southeast1"
// Filter berdasarkan keberadaan attribute
hasPrefix(attributes.order-id, "ORD-")
// Filter kombinasi
attributes.env = "production" AND attributes.priority = "high"
// Filter berdasarkan data pesan (harus JSON)
// Tidak didukung — filter hanya bekerja pada message attributes, bukan body
Filter Pub/Sub hanya bekerja pada message attributes, bukan pada isi body pesan. Jika kamu butuh routing berdasarkan konten pesan, lakukan di sisi consumer setelah menerima pesan, atau enkode kriteria routing sebagai attributes saat publish.
Message Ordering #
Secara default, Pub/Sub tidak menjamin urutan pengiriman pesan. Untuk menjamin ordering, aktifkan enableMessageOrdering di Subscriber dan gunakan orderingKey saat publish.
// Publisher — aktifkan message ordering di publisher
Publisher publisher = Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true) // wajib jika ingin pakai ordering key
.build();
// Publish dengan ordering key — semua pesan dengan key yang sama dikirim urut
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(payload))
.setOrderingKey("order-" + orderId) // pesan untuk order yang sama selalu urut
.build();
publisher.publish(message);
// Subscription — aktifkan message ordering di subscription
Subscription subscription = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setEnableMessageOrdering(true) // wajib agar ordering key dihormati
.setAckDeadlineSeconds(60)
.build();
Jika publisher gagal menerbitkan pesan dengan ordering key tertentu, semua pesan selanjutnya dengan ordering key yang sama akan ditolak sampai kamu memanggil publisher.resumePublish(orderingKey) secara eksplisit. Ini untuk mencegah gap yang merusak urutan.Dead Letter Topic #
Dead Letter Topic (DLT) di Pub/Sub bekerja di level subscription — pesan yang melewati batas maxDeliveryAttempts tanpa di-acknowledge otomatis dipindahkan ke topic lain.
flowchart TD
PUB[Publisher] --> T[Topic: order-events]
T --> SUB[Subscription: order-sub\nmaxDeliveryAttempts=5]
SUB --> C{Consumer\nberhasil ack?}
C -- Ya --> DONE[Pesan dihapus\ndari subscription]
C -- Tidak\nconsumer.nack / timeout --> SUB
SUB -->|Sudah 5x gagal| DLT[Dead Letter Topic:\norder-events-dlt]
DLT --> DLSUB[Subscription: dlq-sub]
DLSUB --> MON[Monitoring &\nInvestigasi]Pesan yang masuk ke DLT menyertakan attributes tambahan untuk investigasi:
CloudPubSubDeadLetterSourceSubscription → nama subscription asal
CloudPubSubDeadLetterSourceTopicPublishTime → waktu pesan pertama diterbitkan
Consumer untuk DLT:
public class DeadLetterConsumer {
public static void monitorDLT(String projectId, String dltSubscriptionId) throws Exception {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, dltSubscriptionId);
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
String data = message.getData().toStringUtf8();
Map<String, String> attributes = message.getAttributesMap();
// Ambil informasi asal pesan dari attributes yang ditambahkan Pub/Sub
String sourceSubscription = attributes.getOrDefault(
"CloudPubSubDeadLetterSourceSubscription", "unknown"
);
System.err.printf(
"[DLT] Pesan gagal dari subscription '%s': %s%n",
sourceSubscription, data
);
// Kirim alert dan log untuk investigasi
sendAlert(data, attributes);
// Ack di DLT — pesan sudah dilogging, tidak perlu diproses ulang otomatis
consumer.ack();
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.println("DLT consumer berjalan...");
subscriber.awaitTerminated();
}
private static void sendAlert(String data, Map<String, String> attributes) {
System.out.println("Alert dikirim ke tim ops: " + data);
}
}
Seek — Memutar Ulang atau Melewati Pesan #
Salah satu fitur yang membedakan Pub/Sub dari SQS adalah kemampuan seek — kamu bisa “memutar ulang” subscription ke titik waktu tertentu di masa lalu, atau melewati semua pesan yang ada di queue. Ini berguna untuk reprocessing setelah bug diperbaiki atau untuk skip backlog yang tidak relevan.
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.SeekRequest;
import com.google.pubsub.v1.SubscriptionName;
import com.google.protobuf.Timestamp;
import java.time.Instant;
public class PubSubSeek {
// Seek ke titik waktu tertentu — subscription akan menerima ulang semua pesan
// yang diterbitkan setelah timestamp ini (dalam window message retention)
public static void seekToTime(String projectId, String subscriptionId,
Instant targetTime) throws Exception {
try (SubscriptionAdminClient adminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(targetTime.getEpochSecond())
.build();
SeekRequest seekRequest = SeekRequest.newBuilder()
.setSubscription(subscriptionName.toString())
.setTime(timestamp)
.build();
adminClient.seek(seekRequest);
System.out.println("Subscription di-seek ke: " + targetTime);
}
}
// Skip semua pesan yang ada — subscription hanya menerima pesan baru setelah ini
public static void skipBacklog(String projectId, String subscriptionId) throws Exception {
// Seek ke "sekarang" — semua pesan yang ada dianggap sudah diproses
seekToTime(projectId, subscriptionId, Instant.now());
System.out.println("Backlog di-skip. Hanya menerima pesan baru.");
}
}
Push Subscription — Menerima Pesan via HTTP #
Untuk push subscription, Pub/Sub mengirimkan HTTP POST ke endpoint kamu. Request body berisi pesan dalam format JSON envelope:
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Base64;
public class PushSubscriptionHandler implements HttpHandler {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void handle(HttpExchange exchange) throws IOException {
if (!"POST".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, -1);
return;
}
try {
// Baca body request
byte[] requestBody = exchange.getRequestBody().readAllBytes();
String body = new String(requestBody);
// Parse JSON envelope dari Pub/Sub
// Format: { "message": { "data": "<base64>", "attributes": {...}, "messageId": "..." },
// "subscription": "..." }
var envelope = objectMapper.readTree(body);
var messageNode = envelope.get("message");
// Data di-encode sebagai base64
String encodedData = messageNode.get("data").asText();
String data = new String(Base64.getDecoder().decode(encodedData));
String messageId = messageNode.get("messageId").asText();
System.out.printf("Push message diterima — MessageId=%s: %s%n", messageId, data);
// Proses pesan
processMessage(data);
// Return 2xx untuk acknowledge pesan ke Pub/Sub
// Pub/Sub menganggap pengiriman berhasil jika response 2xx dalam ack deadline
exchange.sendResponseHeaders(200, -1);
} catch (Exception e) {
System.err.println("Gagal proses push message: " + e.getMessage());
// Return non-2xx untuk memberitahu Pub/Sub agar retry
exchange.sendResponseHeaders(500, -1);
} finally {
exchange.close();
}
}
private void processMessage(String data) throws Exception {
System.out.println("Memproses: " + data);
// logika bisnis
}
// Contoh server HTTP sederhana untuk menerima push
public static void main(String[] args) throws Exception {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/pubsub/push", new PushSubscriptionHandler());
server.start();
System.out.println("Push endpoint berjalan di port 8080");
}
}
Kapan Menggunakan Pub/Sub dan Kapan Tidak #
GUNAKAN GOOGLE PUB/SUB JIKA:
✓ Sudah di ekosistem Google Cloud Platform
✓ Butuh fan-out native — satu topic, banyak subscriber independen
✓ Integrasi erat dengan BigQuery, Dataflow, Cloud Functions, Cloud Run
✓ Butuh skala global tanpa konfigurasi geo-routing
✓ Ingin zero ops — tidak ada broker yang perlu di-maintain
✓ Push subscriber untuk serverless atau webhook sederhana
✓ Butuh message filtering di sisi broker (berdasarkan attributes)
✓ Butuh seek — replay atau skip pesan historis
PERTIMBANGKAN ALTERNATIF JIKA:
✗ Tidak di GCP dan tidak mau vendor lock-in → Kafka atau RabbitMQ
✗ Butuh routing kompleks (exchange pattern) → RabbitMQ
✗ Butuh throughput sangat tinggi dengan latensi ultra-rendah → Kafka
✗ Butuh ordering global (bukan per key) → Kafka dengan satu partition
✗ Pesan lebih besar dari 10 MB → Pub/Sub tidak support, perlu GCS + pointer
✗ Butuh exactly-once processing end-to-end → Pub/Sub hanya at-least-once
flowchart TD
A{Sudah di\nekosistem GCP?} -- Ya --> B{Butuh fan-out\nke banyak subscriber?}
A -- Tidak --> C{Butuh managed\nservice?}
B -- Ya --> PUBSUB[Google Pub/Sub]
B -- Tidak --> D{Butuh integrasi\nGCP services?}
D -- Ya --> PUBSUB
D -- Tidak --> E{Butuh routing\nkompleks?}
E -- Ya --> RABBIT[RabbitMQ]
E -- Tidak --> SQS[Amazon SQS]
C -- Ya --> F{Di AWS?}
F -- Ya --> SQS
F -- Tidak --> PUBSUB
C -- Tidak --> G{Volume sangat\ntinggi / replay?}
G -- Ya --> KAFKA[Kafka]
G -- Tidak --> RABBITRingkasan #
- Topic adalah jalur distribusi, subscription adalah penyimpan pesan — buat subscription sebelum publisher mulai mengirim, atau pesan akan hilang permanen.
- Pull untuk long-running service, push untuk serverless — push subscription menghilangkan kebutuhan polling loop, tapi endpoint harus dapat diakses publik via HTTPS.
- Acknowledgment deadline menentukan berapa lama subscriber punya waktu untuk proses dan ack pesan. Gunakan
setMaxAckExtensionPeriodpada Subscriber async agar library memperpanjang deadline secara otomatis.- Message ordering membutuhkan
enableMessageOrdering=truedi publisher dan subscription, sertaorderingKeydi setiap pesan. Jika publish gagal, panggilresumePublish(orderingKey)sebelum melanjutkan.- Filter di subscription memungkinkan routing pesan berdasarkan attributes di sisi Pub/Sub — subscriber hanya menerima pesan yang relevan tanpa perlu filter di sisi aplikasi.
- Dead Letter Topic dikonfigurasi di level subscription via
maxDeliveryAttempts— lebih bersih dari implementasi DLQ manual karena Pub/Sub menangani pemindahan pesan secara otomatis.- Seek adalah fitur unik Pub/Sub — kamu bisa replay pesan dari timestamp tertentu atau skip seluruh backlog, tanpa perlu mengubah kode consumer.
- Pub/Sub paling cocok untuk ekosistem GCP, fan-out ke banyak subscriber, integrasi dengan layanan GCP, dan skalabilitas global — bukan untuk routing kompleks atau pesan sangat besar.