Google Pub/Sub

Google Cloud Pub/Sub #

Di antara semua managed message broker yang tersedia di cloud, Google Cloud Pub/Sub memiliki posisi yang unik: ia dirancang dari awal untuk skala global, bukan skala regional. Satu topic Pub/Sub bisa menerima dan mendistribusikan pesan ke subscriber di belahan dunia mana pun tanpa kamu perlu memikirkan replikasi atau geo-routing. Ini menjadikannya pilihan alami untuk aplikasi yang dibangun di atas Google Cloud Platform — mulai dari pipeline data analytics yang terintegrasi erat dengan BigQuery, hingga event-driven architecture yang menghubungkan Cloud Run, Cloud Functions, dan Dataflow. Secara model, Pub/Sub lebih mirip dengan kombinasi SNS dan SQS di AWS: satu topic bisa punya banyak subscription, dan setiap subscription bertindak seperti queue tersendiri yang menerima salinan semua pesan dari topic.

Arsitektur Pub/Sub #

Memahami model Pub/Sub sangat penting sebelum menulis kode, karena beberapa konsepnya terasa berlawanan intuisi jika kamu terbiasa dengan broker lain.

Topic dan Subscription #

Topic adalah resource tempat publisher mengirim pesan. Topic sendiri tidak menyimpan pesan — ia hanya jalur distribusi.

Subscription adalah resource yang melekat pada topic dan benar-benar menyimpan pesan sampai di-acknowledge atau sampai retention period habis. Satu topic bisa punya banyak subscription, dan setiap subscription menerima salinan semua pesan secara independen — persis seperti consumer group di Kafka, tapi dengan cara kerja yang berbeda.

flowchart LR
    PUB[Publisher] --> T[Topic:\norder-events]

    T --> S1[Subscription:\nanalytics-sub]
    T --> S2[Subscription:\nnotification-sub]
    T --> S3[Subscription:\naudit-sub]

    S1 --> C1[Analytics Service\nPull subscriber]
    S2 --> C2[Notification Service\nPull subscriber]
    S3 --> WH[Webhook endpoint\nPush subscriber]

Poin kritis yang sering membingungkan: jika belum ada subscription ketika pesan diterbitkan, pesan tersebut hilang. Pub/Sub tidak menyimpan pesan di level topic, hanya di level subscription. Buat subscription sebelum publisher mulai mengirim pesan.

Push vs Pull Delivery #

Pub/Sub mendukung dua mekanisme pengiriman pesan ke subscriber:

Pull — subscriber yang aktif meminta pesan ke Pub/Sub. Mirip seperti long polling di SQS. Cocok untuk consumer yang berjalan sebagai long-running process (Kubernetes pod, VM, Cloud Run yang selalu aktif).

Push — Pub/Sub yang aktif mengirim pesan ke endpoint HTTPS milik subscriber. Cocok untuk Cloud Functions, Cloud Run (serverless), atau webhook. Pub/Sub retry otomatis jika endpoint mengembalikan non-2xx response.

flowchart TD
    T[Topic]

    subgraph PULL[Pull Subscription]
        S1[Subscription] -->|subscriber tarik pesan| C1[Consumer\nlong-running service]
    end

    subgraph PUSH[Push Subscription]
        S2[Subscription] -->|Pub/Sub dorong pesan| EP[HTTPS Endpoint\nCloud Function / webhook]
    end

    T --> S1
    T --> S2
PullPush
Siapa yang inisiasiSubscriberPub/Sub
Cocok untukLong-running serviceServerless, webhook
Kontrol lajuSubscriber kontrol sendiriPub/Sub kontrol (max burst rate)
AutentikasiIAM di sisi subscriberOIDC token di header request
OperasionalPerlu manage polling loopZero ops di sisi consumer

Acknowledgment Deadline #

Mirip dengan visibility timeout di SQS. Ketika subscriber menerima pesan melalui pull, ia punya waktu sebesar acknowledgment deadline (default 10 detik, maks 600 detik) untuk mengirimkan acknowledge. Jika tidak, Pub/Sub mengirimkan pesan kembali.

Pesan diterima subscriber
    │
    ├── dalam ack deadline → subscriber kirim ack → pesan dihapus dari subscription
    │
    └── melebihi ack deadline → Pub/Sub kirim ulang ke subscriber lain (atau subscriber yang sama)

Setup Dependencies #

Tambahkan Google Cloud Pub/Sub client library ke project Java kamu.

Untuk Maven:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>libraries-bom</artifactId>
            <version>26.37.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!-- Google Cloud Pub/Sub -->
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-pubsub</artifactId>
    </dependency>
</dependencies>

Untuk Gradle:

dependencies {
    implementation platform('com.google.cloud:libraries-bom:26.37.0')
    implementation 'com.google.cloud:google-cloud-pubsub'
}

Untuk development lokal, gunakan Pub/Sub Emulator:

# Install Google Cloud SDK jika belum ada
# Kemudian jalankan emulator
gcloud beta emulators pubsub start --project=my-project --host-port=localhost:8085

# Set environment variable agar client library mengarah ke emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
export GOOGLE_CLOUD_PROJECT=my-project

Atau gunakan Docker:

docker run -d \
  --name pubsub-emulator \
  -p 8085:8085 \
  gcr.io/google.com/cloudsdktool/google-cloud-cli \
  gcloud beta emulators pubsub start \
    --project=my-project \
    --host-port=0.0.0.0:8085

Autentikasi #

Pub/Sub menggunakan Google Cloud IAM untuk autentikasi. Client library secara otomatis mencari Application Default Credentials (ADC) dari beberapa sumber secara berurutan:

1. Environment variable GOOGLE_APPLICATION_CREDENTIALS → path ke service account JSON
2. gcloud auth application-default login → credentials dari gcloud CLI
3. Metadata server → jika berjalan di GCE, GKE, Cloud Run, atau Cloud Functions (IAM role)
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;

import java.io.FileInputStream;

public class PubSubAuth {

    // ✓ BENAR: Application Default Credentials — otomatis di GCP, gcloud untuk dev
    // Tidak perlu kode tambahan — library menanganinya sendiri
    // Cukup set GOOGLE_APPLICATION_CREDENTIALS atau jalankan gcloud auth

    // ✓ BENAR: Service account eksplisit jika butuh credentials tertentu
    public static GoogleCredentials loadServiceAccount(String keyFilePath) throws Exception {
        try (FileInputStream stream = new FileInputStream(keyFilePath)) {
            return ServiceAccountCredentials.fromStream(stream)
                .createScoped("https://www.googleapis.com/auth/cloud-platform");
        }
    }

    // ✗ ANTI-PATTERN: hardcode service account key di kode sumber
    // Key bisa bocor ke version control atau log aplikasi
    public static final String HARDCODED_KEY = "{ \"type\": \"service_account\", ... }";
}
Jangan commit service account JSON key ke version control. Gunakan Secret Manager, environment variable yang diinjeksikan saat deploy, atau Workload Identity (untuk GKE) yang menghilangkan kebutuhan service account key sama sekali.

Manajemen Topic dan Subscription #

Pub/Sub menyediakan TopicAdminClient dan SubscriptionAdminClient untuk manajemen resource secara programatis.

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.*;

import org.threeten.bp.Duration;

public class PubSubResourceManager {

    private final String projectId;

    public PubSubResourceManager(String projectId) {
        this.projectId = projectId;
    }

    // Membuat topic
    public void createTopic(String topicId) throws Exception {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);

            Topic topic = Topic.newBuilder()
                .setName(topicName.toString())
                // Retention pesan di topic (bukan subscription) — 1 hari
                // Ini memungkinkan subscription yang dibuat setelah pesan diterbitkan
                // untuk mengakses pesan dalam window ini (seek to timestamp)
                .setMessageRetentionDuration(
                    com.google.protobuf.Duration.newBuilder()
                        .setSeconds(86400) // 24 jam
                        .build()
                )
                .build();

            topicAdminClient.createTopic(topic);
            System.out.println("Topic dibuat: " + topicName);
        }
    }

    // Membuat Pull Subscription
    public void createPullSubscription(String topicId, String subscriptionId) throws Exception {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
            SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

            Subscription subscription = Subscription.newBuilder()
                .setName(subscriptionName.toString())
                .setTopic(topicName.toString())
                .setAckDeadlineSeconds(60)         // subscriber punya 60 detik untuk ack
                .setRetainAckedMessages(false)     // hapus pesan yang sudah di-ack
                .setMessageRetentionDuration(
                    com.google.protobuf.Duration.newBuilder()
                        .setSeconds(7 * 86400)     // simpan pesan yang belum di-ack selama 7 hari
                        .build()
                )
                .build();

            subscriptionAdminClient.createSubscription(subscription);
            System.out.println("Pull subscription dibuat: " + subscriptionName);
        }
    }

    // Membuat Push Subscription
    public void createPushSubscription(String topicId, String subscriptionId,
                                        String pushEndpoint) throws Exception {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
            SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

            PushConfig pushConfig = PushConfig.newBuilder()
                .setPushEndpoint(pushEndpoint) // harus HTTPS dan bisa diakses publik
                .build();

            Subscription subscription = Subscription.newBuilder()
                .setName(subscriptionName.toString())
                .setTopic(topicName.toString())
                .setPushConfig(pushConfig)
                .setAckDeadlineSeconds(30)
                .build();

            subscriptionAdminClient.createSubscription(subscription);
            System.out.println("Push subscription dibuat: " + subscriptionName);
        }
    }

    // Membuat Subscription dengan Dead Letter Topic
    public void createSubscriptionWithDLT(String topicId, String subscriptionId,
                                           String deadLetterTopicId) throws Exception {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
            TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);
            SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

            DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder()
                .setDeadLetterTopic(deadLetterTopicName.toString())
                .setMaxDeliveryAttempts(5) // kirim ke DLT setelah 5 kali gagal
                .build();

            Subscription subscription = Subscription.newBuilder()
                .setName(subscriptionName.toString())
                .setTopic(topicName.toString())
                .setDeadLetterPolicy(deadLetterPolicy)
                .setAckDeadlineSeconds(60)
                .build();

            subscriptionAdminClient.createSubscription(subscription);
            System.out.println("Subscription dengan DLT dibuat: " + subscriptionName);
        }
    }
}

Publisher #

Publisher menggunakan Publisher client yang mendukung pengiriman asynchronous dengan batching otomatis.

Konfigurasi Publisher #

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

import org.threeten.bp.Duration;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PubSubPublisher {

    private final Publisher publisher;

    public PubSubPublisher(String projectId, String topicId) throws Exception {
        TopicName topicName = TopicName.of(projectId, topicId);

        // Konfigurasi batching — library mengumpulkan pesan sebelum dikirim
        BatchingSettings batchingSettings = BatchingSettings.newBuilder()
            .setElementCountThreshold(100L)         // kirim jika sudah 100 pesan
            .setRequestByteThreshold(1024 * 1024L)  // atau jika total sudah 1 MB
            .setDelayThreshold(Duration.ofMillis(100)) // atau jika sudah 100ms sejak pesan pertama
            .build();

        // Konfigurasi retry untuk transient error
        RetrySettings retrySettings = RetrySettings.newBuilder()
            .setMaxAttempts(5)
            .setInitialRetryDelay(Duration.ofMillis(100))
            .setMaxRetryDelay(Duration.ofSeconds(60))
            .setRetryDelayMultiplier(2.0)
            .build();

        this.publisher = Publisher.newBuilder(topicName)
            .setBatchingSettings(batchingSettings)
            .setRetrySettings(retrySettings)
            .build();
    }

    // Publish pesan dengan attributes (metadata)
    public void publish(String data, Map<String, String> attributes) {
        ByteString byteData = ByteString.copyFromUtf8(data);

        PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder()
            .setData(byteData);

        // Tambahkan attributes — key-value metadata yang bisa digunakan untuk filtering
        if (attributes != null) {
            messageBuilder.putAllAttributes(attributes);
        }

        PubsubMessage message = messageBuilder.build();

        // publish() mengembalikan ApiFuture — non-blocking
        ApiFuture<String> future = publisher.publish(message);

        // Daftarkan callback untuk mengetahui hasilnya
        ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
            @Override
            public void onSuccess(String messageId) {
                System.out.println("Pesan berhasil diterbitkan — MessageId: " + messageId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.err.println("Gagal menerbitkan pesan: " + throwable.getMessage());
                // Di sini: log error, kirim ke fallback, atau alert monitoring
            }
        }, Executors.newSingleThreadExecutor());
    }

    // Publish dengan ordering key — membutuhkan message ordering diaktifkan di subscription
    public void publishOrdered(String data, String orderingKey) {
        PubsubMessage message = PubsubMessage.newBuilder()
            .setData(ByteString.copyFromUtf8(data))
            .setOrderingKey(orderingKey) // semua pesan dengan key yang sama dikirim urut
            .build();

        publisher.publish(message);
    }

    // Wajib dipanggil saat aplikasi shutdown
    // Publisher flush semua pesan yang masih di buffer sebelum berhenti
    public void shutdown() throws Exception {
        publisher.shutdown();
        publisher.awaitTermination(30, TimeUnit.SECONDS);
    }
}

Publish Batch Eksplisit #

Meski batching dilakukan otomatis oleh library, kamu bisa publish banyak pesan sekaligus dan menunggu semua selesai:

import java.util.ArrayList;
import java.util.List;

public class BatchPublisher {

    private final Publisher publisher;

    public BatchPublisher(Publisher publisher) {
        this.publisher = publisher;
    }

    public void publishBatch(List<String> messages) throws Exception {
        List<ApiFuture<String>> futures = new ArrayList<>();

        for (String message : messages) {
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(message))
                .build();

            futures.add(publisher.publish(pubsubMessage));
        }

        // Tunggu semua pesan selesai diterbitkan
        // ApiFutures.allAsList() gagal jika ada satu pun yang gagal
        try {
            List<String> messageIds = ApiFutures.allAsList(futures).get();
            System.out.printf("Berhasil publish %d pesan%n", messageIds.size());
        } catch (Exception e) {
            System.err.println("Satu atau lebih pesan gagal diterbitkan: " + e.getMessage());
            throw e;
        }
    }
}

Subscriber — Pull #

Pull subscriber menggunakan Subscriber client yang menangani polling, threading, dan ack secara otomatis di background.

Synchronous Pull #

Gunakan pull synchronous untuk skenario di mana kamu butuh kontrol penuh atas kapan pesan diproses — misalnya pipeline batch yang hanya berjalan pada waktu tertentu:

import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.*;

public class SynchronousPullSubscriber {

    private final String projectId;
    private final String subscriptionId;

    public SynchronousPullSubscriber(String projectId, String subscriptionId) {
        this.projectId = projectId;
        this.subscriptionId = subscriptionId;
    }

    public void pullAndProcess(int maxMessages) throws Exception {
        SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20 MB
                    .build()
            )
            .build();

        try (GrpcSubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
            String subscriptionName = SubscriptionName.format(projectId, subscriptionId);

            PullRequest pullRequest = PullRequest.newBuilder()
                .setMaxMessages(maxMessages)
                .setSubscription(subscriptionName)
                .build();

            PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
            List<String> ackIds = new ArrayList<>();

            for (ReceivedMessage receivedMessage : pullResponse.getReceivedMessagesList()) {
                PubsubMessage message = receivedMessage.getMessage();
                String data = message.getData().toStringUtf8();

                System.out.printf("MessageId=%s | Data: %s%n",
                    message.getMessageId(), data);
                System.out.println("Attributes: " + message.getAttributesMap());

                try {
                    processMessage(data, message.getAttributesMap());
                    ackIds.add(receivedMessage.getAckId());
                } catch (Exception e) {
                    System.err.println("Gagal proses, tidak akan di-ack: " + e.getMessage());
                    // Jangan tambahkan ke ackIds — Pub/Sub akan kirim ulang setelah deadline
                }
            }

            // Acknowledge semua pesan yang berhasil diproses dalam satu request
            if (!ackIds.isEmpty()) {
                AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
                    .setSubscription(subscriptionName)
                    .addAllAckIds(ackIds)
                    .build();

                subscriber.acknowledgeCallable().call(acknowledgeRequest);
                System.out.println("Acknowledged " + ackIds.size() + " pesan.");
            }
        }
    }

    private void processMessage(String data, Map<String, String> attributes) throws Exception {
        // logika bisnis
    }
}

Asynchronous Pull (Streaming) #

Untuk long-running consumer, gunakan streaming pull dengan Subscriber — ini adalah cara yang paling umum dan efisien:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class StreamingPullSubscriber {

    public static void startSubscriber(String projectId, String subscriptionId) throws Exception {
        ProjectSubscriptionName subscriptionName =
            ProjectSubscriptionName.of(projectId, subscriptionId);

        // MessageReceiver dipanggil di thread terpisah untuk setiap pesan
        MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
            String data = message.getData().toStringUtf8();
            String messageId = message.getMessageId();
            Map<String, String> attributes = message.getAttributesMap();

            System.out.printf("Menerima pesan — MessageId=%s%n", messageId);

            try {
                processMessage(data, attributes);

                // ✓ Ack — Pub/Sub hapus pesan dari subscription
                consumer.ack();
                System.out.println("Pesan di-ack: " + messageId);

            } catch (Exception e) {
                System.err.printf("Gagal proses MessageId=%s: %s%n", messageId, e.getMessage());

                // ✓ Nack — Pub/Sub kirim ulang setelah ack deadline
                // Jika subscription punya Dead Letter Policy, pesan akan masuk DLT
                // setelah maxDeliveryAttempts tercapai
                consumer.nack();
            }
        };

        Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
            .setMaxAckExtensionPeriod(Duration.ofSeconds(120)) // perpanjang ack deadline otomatis
            .setParallelPullCount(2)      // jumlah streaming pull paralel
            .setExecutorProvider(         // thread pool untuk memproses pesan
                com.google.api.gax.core.InstantiatingExecutorProvider.newBuilder()
                    .setExecutorThreadCount(4)
                    .build()
            )
            .build();

        // Daftarkan listener untuk error fatal
        subscriber.addListener(new Subscriber.Listener() {
            @Override
            public void failed(Subscriber.State from, Throwable failure) {
                System.err.println("Subscriber gagal dari state " + from + ": " + failure.getMessage());
                // Alert ke monitoring system
            }
        }, Executors.newSingleThreadExecutor());

        subscriber.startAsync().awaitRunning();
        System.out.println("Subscriber berjalan, menunggu pesan...");

        // Jalankan selamanya (atau sampai signal shutdown)
        try {
            subscriber.awaitTerminated(30, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            subscriber.stopAsync();
        }
    }

    private static void processMessage(String data, Map<String, String> attributes) throws Exception {
        System.out.println("Memproses: " + data);
        // logika bisnis
    }
}

Memperpanjang Acknowledgment Deadline #

Jika pemrosesan membutuhkan waktu lebih lama dari ack deadline, gunakan ModifyAckDeadline untuk memperpanjang:

// Dengan Subscriber async, library menangani ini otomatis melalui setMaxAckExtensionPeriod
// Untuk synchronous pull, kamu perlu melakukannya secara manual:

public void extendDeadline(GrpcSubscriberStub subscriber,
                            String subscriptionName,
                            List<String> ackIds,
                            int newDeadlineSeconds) {
    ModifyAckDeadlineRequest modifyRequest = ModifyAckDeadlineRequest.newBuilder()
        .setSubscription(subscriptionName)
        .addAllAckIds(ackIds)
        .setAckDeadlineSeconds(newDeadlineSeconds) // perpanjang hingga N detik dari sekarang
        .build();

    subscriber.modifyAckDeadlineCallable().call(modifyRequest);
    System.out.println("Ack deadline diperpanjang " + newDeadlineSeconds + " detik.");
}

Filtering Pesan #

Pub/Sub mendukung filter berbasis CEL (Common Expression Language) di level subscription. Subscriber hanya menerima pesan yang cocok dengan filter — pesan yang tidak cocok di-acknowledge otomatis oleh Pub/Sub (tidak masuk ke subscription).

// Filter hanya pesan dengan attribute "event-type" bernilai "order.created"
// atau "order.shipped"
String filter = "attributes.\"event-type\" = \"order.created\" " +
                "OR attributes.\"event-type\" = \"order.shipped\"";

Subscription subscription = Subscription.newBuilder()
    .setName(subscriptionName.toString())
    .setTopic(topicName.toString())
    .setFilter(filter) // filter diterapkan di sisi Pub/Sub, bukan di subscriber
    .setAckDeadlineSeconds(60)
    .build();

subscriptionAdminClient.createSubscription(subscription);

Contoh filter lain yang umum digunakan:

// Filter berdasarkan satu attribute
attributes.region = "asia-southeast1"

// Filter berdasarkan keberadaan attribute
hasPrefix(attributes.order-id, "ORD-")

// Filter kombinasi
attributes.env = "production" AND attributes.priority = "high"

// Filter berdasarkan data pesan (harus JSON)
// Tidak didukung — filter hanya bekerja pada message attributes, bukan body
Filter Pub/Sub hanya bekerja pada message attributes, bukan pada isi body pesan. Jika kamu butuh routing berdasarkan konten pesan, lakukan di sisi consumer setelah menerima pesan, atau enkode kriteria routing sebagai attributes saat publish.

Message Ordering #

Secara default, Pub/Sub tidak menjamin urutan pengiriman pesan. Untuk menjamin ordering, aktifkan enableMessageOrdering di Subscriber dan gunakan orderingKey saat publish.

// Publisher — aktifkan message ordering di publisher
Publisher publisher = Publisher.newBuilder(topicName)
    .setEnableMessageOrdering(true) // wajib jika ingin pakai ordering key
    .build();

// Publish dengan ordering key — semua pesan dengan key yang sama dikirim urut
PubsubMessage message = PubsubMessage.newBuilder()
    .setData(ByteString.copyFromUtf8(payload))
    .setOrderingKey("order-" + orderId) // pesan untuk order yang sama selalu urut
    .build();

publisher.publish(message);
// Subscription — aktifkan message ordering di subscription
Subscription subscription = Subscription.newBuilder()
    .setName(subscriptionName.toString())
    .setTopic(topicName.toString())
    .setEnableMessageOrdering(true) // wajib agar ordering key dihormati
    .setAckDeadlineSeconds(60)
    .build();
Jika publisher gagal menerbitkan pesan dengan ordering key tertentu, semua pesan selanjutnya dengan ordering key yang sama akan ditolak sampai kamu memanggil publisher.resumePublish(orderingKey) secara eksplisit. Ini untuk mencegah gap yang merusak urutan.

Dead Letter Topic #

Dead Letter Topic (DLT) di Pub/Sub bekerja di level subscription — pesan yang melewati batas maxDeliveryAttempts tanpa di-acknowledge otomatis dipindahkan ke topic lain.

flowchart TD
    PUB[Publisher] --> T[Topic: order-events]
    T --> SUB[Subscription: order-sub\nmaxDeliveryAttempts=5]
    SUB --> C{Consumer\nberhasil ack?}

    C -- Ya --> DONE[Pesan dihapus\ndari subscription]
    C -- Tidak\nconsumer.nack / timeout --> SUB
    SUB -->|Sudah 5x gagal| DLT[Dead Letter Topic:\norder-events-dlt]
    DLT --> DLSUB[Subscription: dlq-sub]
    DLSUB --> MON[Monitoring &\nInvestigasi]

Pesan yang masuk ke DLT menyertakan attributes tambahan untuk investigasi:

CloudPubSubDeadLetterSourceSubscription → nama subscription asal
CloudPubSubDeadLetterSourceTopicPublishTime → waktu pesan pertama diterbitkan

Consumer untuk DLT:

public class DeadLetterConsumer {

    public static void monitorDLT(String projectId, String dltSubscriptionId) throws Exception {
        ProjectSubscriptionName subscriptionName =
            ProjectSubscriptionName.of(projectId, dltSubscriptionId);

        MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
            String data = message.getData().toStringUtf8();
            Map<String, String> attributes = message.getAttributesMap();

            // Ambil informasi asal pesan dari attributes yang ditambahkan Pub/Sub
            String sourceSubscription = attributes.getOrDefault(
                "CloudPubSubDeadLetterSourceSubscription", "unknown"
            );

            System.err.printf(
                "[DLT] Pesan gagal dari subscription '%s': %s%n",
                sourceSubscription, data
            );

            // Kirim alert dan log untuk investigasi
            sendAlert(data, attributes);

            // Ack di DLT — pesan sudah dilogging, tidak perlu diproses ulang otomatis
            consumer.ack();
        };

        Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
        subscriber.startAsync().awaitRunning();
        System.out.println("DLT consumer berjalan...");
        subscriber.awaitTerminated();
    }

    private static void sendAlert(String data, Map<String, String> attributes) {
        System.out.println("Alert dikirim ke tim ops: " + data);
    }
}

Seek — Memutar Ulang atau Melewati Pesan #

Salah satu fitur yang membedakan Pub/Sub dari SQS adalah kemampuan seek — kamu bisa “memutar ulang” subscription ke titik waktu tertentu di masa lalu, atau melewati semua pesan yang ada di queue. Ini berguna untuk reprocessing setelah bug diperbaiki atau untuk skip backlog yang tidak relevan.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.SeekRequest;
import com.google.pubsub.v1.SubscriptionName;

import com.google.protobuf.Timestamp;
import java.time.Instant;

public class PubSubSeek {

    // Seek ke titik waktu tertentu — subscription akan menerima ulang semua pesan
    // yang diterbitkan setelah timestamp ini (dalam window message retention)
    public static void seekToTime(String projectId, String subscriptionId,
                                   Instant targetTime) throws Exception {
        try (SubscriptionAdminClient adminClient = SubscriptionAdminClient.create()) {
            SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

            Timestamp timestamp = Timestamp.newBuilder()
                .setSeconds(targetTime.getEpochSecond())
                .build();

            SeekRequest seekRequest = SeekRequest.newBuilder()
                .setSubscription(subscriptionName.toString())
                .setTime(timestamp)
                .build();

            adminClient.seek(seekRequest);
            System.out.println("Subscription di-seek ke: " + targetTime);
        }
    }

    // Skip semua pesan yang ada — subscription hanya menerima pesan baru setelah ini
    public static void skipBacklog(String projectId, String subscriptionId) throws Exception {
        // Seek ke "sekarang" — semua pesan yang ada dianggap sudah diproses
        seekToTime(projectId, subscriptionId, Instant.now());
        System.out.println("Backlog di-skip. Hanya menerima pesan baru.");
    }
}

Push Subscription — Menerima Pesan via HTTP #

Untuk push subscription, Pub/Sub mengirimkan HTTP POST ke endpoint kamu. Request body berisi pesan dalam format JSON envelope:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Base64;

public class PushSubscriptionHandler implements HttpHandler {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void handle(HttpExchange exchange) throws IOException {
        if (!"POST".equals(exchange.getRequestMethod())) {
            exchange.sendResponseHeaders(405, -1);
            return;
        }

        try {
            // Baca body request
            byte[] requestBody = exchange.getRequestBody().readAllBytes();
            String body = new String(requestBody);

            // Parse JSON envelope dari Pub/Sub
            // Format: { "message": { "data": "<base64>", "attributes": {...}, "messageId": "..." },
            //           "subscription": "..." }
            var envelope = objectMapper.readTree(body);
            var messageNode = envelope.get("message");

            // Data di-encode sebagai base64
            String encodedData = messageNode.get("data").asText();
            String data = new String(Base64.getDecoder().decode(encodedData));

            String messageId = messageNode.get("messageId").asText();

            System.out.printf("Push message diterima — MessageId=%s: %s%n", messageId, data);

            // Proses pesan
            processMessage(data);

            // Return 2xx untuk acknowledge pesan ke Pub/Sub
            // Pub/Sub menganggap pengiriman berhasil jika response 2xx dalam ack deadline
            exchange.sendResponseHeaders(200, -1);

        } catch (Exception e) {
            System.err.println("Gagal proses push message: " + e.getMessage());

            // Return non-2xx untuk memberitahu Pub/Sub agar retry
            exchange.sendResponseHeaders(500, -1);
        } finally {
            exchange.close();
        }
    }

    private void processMessage(String data) throws Exception {
        System.out.println("Memproses: " + data);
        // logika bisnis
    }

    // Contoh server HTTP sederhana untuk menerima push
    public static void main(String[] args) throws Exception {
        HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
        server.createContext("/pubsub/push", new PushSubscriptionHandler());
        server.start();
        System.out.println("Push endpoint berjalan di port 8080");
    }
}

Kapan Menggunakan Pub/Sub dan Kapan Tidak #

GUNAKAN GOOGLE PUB/SUB JIKA:
  ✓ Sudah di ekosistem Google Cloud Platform
  ✓ Butuh fan-out native — satu topic, banyak subscriber independen
  ✓ Integrasi erat dengan BigQuery, Dataflow, Cloud Functions, Cloud Run
  ✓ Butuh skala global tanpa konfigurasi geo-routing
  ✓ Ingin zero ops — tidak ada broker yang perlu di-maintain
  ✓ Push subscriber untuk serverless atau webhook sederhana
  ✓ Butuh message filtering di sisi broker (berdasarkan attributes)
  ✓ Butuh seek — replay atau skip pesan historis

PERTIMBANGKAN ALTERNATIF JIKA:
  ✗ Tidak di GCP dan tidak mau vendor lock-in → Kafka atau RabbitMQ
  ✗ Butuh routing kompleks (exchange pattern) → RabbitMQ
  ✗ Butuh throughput sangat tinggi dengan latensi ultra-rendah → Kafka
  ✗ Butuh ordering global (bukan per key) → Kafka dengan satu partition
  ✗ Pesan lebih besar dari 10 MB → Pub/Sub tidak support, perlu GCS + pointer
  ✗ Butuh exactly-once processing end-to-end → Pub/Sub hanya at-least-once
flowchart TD
    A{Sudah di\nekosistem GCP?} -- Ya --> B{Butuh fan-out\nke banyak subscriber?}
    A -- Tidak --> C{Butuh managed\nservice?}

    B -- Ya --> PUBSUB[Google Pub/Sub]
    B -- Tidak --> D{Butuh integrasi\nGCP services?}
    D -- Ya --> PUBSUB
    D -- Tidak --> E{Butuh routing\nkompleks?}
    E -- Ya --> RABBIT[RabbitMQ]
    E -- Tidak --> SQS[Amazon SQS]

    C -- Ya --> F{Di AWS?}
    F -- Ya --> SQS
    F -- Tidak --> PUBSUB

    C -- Tidak --> G{Volume sangat\ntinggi / replay?}
    G -- Ya --> KAFKA[Kafka]
    G -- Tidak --> RABBIT

Ringkasan #

  • Topic adalah jalur distribusi, subscription adalah penyimpan pesan — buat subscription sebelum publisher mulai mengirim, atau pesan akan hilang permanen.
  • Pull untuk long-running service, push untuk serverless — push subscription menghilangkan kebutuhan polling loop, tapi endpoint harus dapat diakses publik via HTTPS.
  • Acknowledgment deadline menentukan berapa lama subscriber punya waktu untuk proses dan ack pesan. Gunakan setMaxAckExtensionPeriod pada Subscriber async agar library memperpanjang deadline secara otomatis.
  • Message ordering membutuhkan enableMessageOrdering=true di publisher dan subscription, serta orderingKey di setiap pesan. Jika publish gagal, panggil resumePublish(orderingKey) sebelum melanjutkan.
  • Filter di subscription memungkinkan routing pesan berdasarkan attributes di sisi Pub/Sub — subscriber hanya menerima pesan yang relevan tanpa perlu filter di sisi aplikasi.
  • Dead Letter Topic dikonfigurasi di level subscription via maxDeliveryAttempts — lebih bersih dari implementasi DLQ manual karena Pub/Sub menangani pemindahan pesan secara otomatis.
  • Seek adalah fitur unik Pub/Sub — kamu bisa replay pesan dari timestamp tertentu atau skip seluruh backlog, tanpa perlu mengubah kode consumer.
  • Pub/Sub paling cocok untuk ekosistem GCP, fan-out ke banyak subscriber, integrasi dengan layanan GCP, dan skalabilitas global — bukan untuk routing kompleks atau pesan sangat besar.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact