Kafka

Apache Kafka #

Ketika sebuah sistem perlu menghubungkan puluhan service yang saling bertukar data secara real-time, pendekatan point-to-point akan menciptakan jaringan koneksi yang tidak terkendali. Apache Kafka hadir sebagai platform streaming terdistribusi yang memungkinkan pertukaran data skala besar dengan latensi rendah, tanpa producer dan consumer perlu saling mengenal. Kafka bukan sekadar message queue — ia adalah commit log terdistribusi yang dapat menyimpan data untuk waktu yang dapat dikonfigurasi, memungkinkan consumer membaca ulang pesan kapan pun dibutuhkan. Dalam konteks Java, ekosistem Kafka sangat matang dengan library resmi yang kaya fitur, mulai dari producer sederhana hingga stream processing dengan Kafka Streams.

Konsep Dasar Kafka #

Sebelum menulis satu baris kode pun, penting untuk memahami model data Kafka karena ia sangat berbeda dari message broker tradisional seperti RabbitMQ. Kafka menggunakan struktur log yang immutable — pesan yang sudah ditulis tidak bisa diubah atau dihapus secara langsung.

Topic dan Partition #

Topic adalah kategori atau nama feed tempat pesan dipublikasikan. Setiap topic dibagi menjadi satu atau lebih partition. Partition adalah unit paralelisme di Kafka — satu partition hanya bisa dikonsumsi oleh satu consumer dalam consumer group yang sama pada satu waktu.

Topic: "order-events"
├── Partition 0: [msg-0] [msg-1] [msg-4] [msg-7]
├── Partition 1: [msg-2] [msg-5] [msg-8]
└── Partition 2: [msg-3] [msg-6] [msg-9]

Urutan pesan hanya dijamin dalam satu partition. Jika kamu butuh ordering global, gunakan satu partition — tapi ini mengorbankan paralelisme. Jika ordering hanya perlu dijamin per entitas (misalnya, semua event untuk order ID yang sama harus urut), gunakan message key — Kafka akan selalu mengirim pesan dengan key yang sama ke partition yang sama.

Offset #

Setiap pesan dalam partition memiliki offset — angka integer yang meningkat secara monoton. Offset adalah “bookmark” posisi consumer. Consumer menyimpan offset terakhir yang sudah diproses, sehingga bisa melanjutkan dari posisi yang benar setelah restart.

sequenceDiagram
    participant Producer
    participant Kafka as Kafka Broker
    participant Consumer

    Producer->>Kafka: publish("order-created", key="order-123")
    Kafka-->>Producer: offset=42 (partition=1)
    Consumer->>Kafka: poll() — mulai dari offset=40
    Kafka-->>Consumer: [msg offset=40, offset=41, offset=42]
    Consumer->>Kafka: commitOffset(partition=1, offset=43)

Broker, Leader, dan Replica #

Kafka berjalan sebagai cluster dari satu atau lebih server yang disebut broker. Setiap partition memiliki satu leader dan nol atau lebih replica. Semua baca-tulis untuk partition tertentu diarahkan ke leadernya. Replica mengikuti leader untuk memberikan fault tolerance.

flowchart TD
    P[Producer] --> B0[Broker 0\nLeader: P0, P2\nReplica: P1]
    P --> B1[Broker 1\nLeader: P1\nReplica: P0, P2]
    P --> B2[Broker 2\nLeader: —\nReplica: P0, P1, P2]

    B0 --> C1[Consumer Group A\nMember 1 — P0]
    B1 --> C2[Consumer Group A\nMember 2 — P1]
    B0 --> C3[Consumer Group A\nMember 3 — P2]

Setup Dependencies #

Untuk menggunakan Kafka dengan Java, tambahkan dependency kafka-clients ke project kamu. Jika menggunakan Maven:

<dependencies>
    <!-- Kafka client resmi -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.7.0</version>
    </dependency>

    <!-- SLF4J untuk logging (diperlukan Kafka client) -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.12</version>
    </dependency>
</dependencies>

Untuk Gradle:

dependencies {
    implementation 'org.apache.kafka:kafka-clients:3.7.0'
    implementation 'org.slf4j:slf4j-simple:2.0.12'
}

Jika kamu menjalankan Kafka secara lokal untuk development, cara termudah adalah menggunakan Docker:

# Jalankan Kafka dengan KRaft mode (tanpa Zookeeper, tersedia sejak Kafka 3.3+)
docker run -d \
  --name kafka \
  -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:3.7.0

Producer #

Producer bertanggung jawab mengirim pesan ke Kafka. Konfigurasi producer mempengaruhi throughput, latensi, dan durability pengiriman pesan.

Konfigurasi Producer #

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class OrderProducer {

    private final KafkaProducer<String, String> producer;
    private static final String TOPIC = "order-events";

    public OrderProducer() {
        Properties props = new Properties();

        // Bootstrap servers — titik masuk awal ke cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Serializer untuk key dan value
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // acks=all — tunggu konfirmasi dari semua in-sync replicas
        // ANTI-PATTERN: acks=0 (fire-and-forget, pesan bisa hilang)
        // ANTI-PATTERN: acks=1 (hanya leader, bisa hilang jika leader crash sebelum replikasi)
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // Retry otomatis jika gagal (transient error)
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);

        // Idempotent producer — mencegah duplikasi saat retry
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Batching untuk throughput lebih tinggi
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);    // 16 KB per batch
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);         // tunggu 5ms untuk batch lebih penuh

        this.producer = new KafkaProducer<>(props);
    }

    public void close() {
        producer.close();
    }
}

Mengirim Pesan #

Ada tiga cara mengirim pesan: fire-and-forget, synchronous, dan asynchronous. Pilih berdasarkan kebutuhan:

public class OrderProducer {

    // ... (constructor di atas)

    // ✗ ANTI-PATTERN: Fire-and-forget tanpa callback
    // Tidak ada cara tahu apakah pesan berhasil dikirim
    public void sendFireAndForget(String orderId, String payload) {
        producer.send(new ProducerRecord<>(TOPIC, orderId, payload));
    }

    // ✓ BENAR: Asynchronous dengan callback — throughput tinggi, tetap tahu hasilnya
    public void sendAsync(String orderId, String payload) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, orderId, payload);

        producer.send(record, (RecordMetadata metadata, Exception exception) -> {
            if (exception != null) {
                System.err.println("Gagal mengirim pesan untuk order " + orderId + ": " + exception.getMessage());
                // Di sini: kirim ke dead letter queue, alert monitoring, atau retry logic
                return;
            }
            System.out.printf(
                "Pesan terkirim — topic=%s, partition=%d, offset=%d%n",
                metadata.topic(),
                metadata.partition(),
                metadata.offset()
            );
        });
    }

    // ✓ BENAR: Synchronous — digunakan saat butuh konfirmasi sebelum lanjut
    // Lebih lambat (blokir per pesan), gunakan hanya jika benar-benar perlu
    public RecordMetadata sendSync(String orderId, String payload) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, orderId, payload);
        Future<RecordMetadata> future = producer.send(record);
        return future.get(); // blokir sampai konfirmasi
    }
}

Mengirim ke Partition Tertentu #

Kafka menentukan partition berdasarkan hash dari message key secara default. Tapi kadang kamu perlu mengirim ke partition tertentu secara eksplisit:

// Kafka menentukan partition berdasarkan hash key — ini sudah cukup untuk kebanyakan kasus
ProducerRecord<String, String> byKey = new ProducerRecord<>(
    "order-events",
    "order-123",   // key — semua pesan dengan key ini masuk partition yang sama
    "{\"status\": \"created\"}"
);

// Menentukan partition secara eksplisit
ProducerRecord<String, String> toPartition = new ProducerRecord<>(
    "order-events",
    0,             // partition index
    "order-456",
    "{\"status\": \"created\"}"
);
Jangan tentukan partition secara eksplisit kecuali kamu punya alasan yang sangat kuat. Ini mempersulit rebalancing dan mengurangi fleksibilitas distribusi beban. Gunakan message key dan biarkan Kafka yang memutuskan partition berdasarkan hash.

Consumer #

Consumer membaca pesan dari Kafka. Berbeda dengan message broker tradisional, membaca pesan dari Kafka tidak menghapusnya — pesan tetap ada sampai retention period habis.

Konfigurasi Consumer #

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderConsumer {

    private final KafkaConsumer<String, String> consumer;
    private static final String TOPIC = "order-events";
    private volatile boolean running = true;

    public OrderConsumer(String groupId) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Group ID — consumer dengan group ID yang sama berbagi partisi
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Auto offset reset — apa yang dilakukan saat tidak ada offset yang tersimpan?
        // "earliest" = mulai dari awal, "latest" = hanya pesan baru
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // ANTI-PATTERN: enable.auto.commit=true — offset di-commit secara otomatis
        // Ini bisa menyebabkan pesan terlewat jika consumer crash setelah commit tapi sebelum proses selesai
        // BENAR: nonaktifkan auto commit, lakukan manual commit setelah proses berhasil
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Heartbeat interval — seberapa sering consumer mengirim sinyal hidup ke broker
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

        // Session timeout — jika tidak ada heartbeat selama ini, consumer dianggap mati
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        // Max poll records — berapa banyak record per poll()
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        this.consumer = new KafkaConsumer<>(props);
    }
}

Poll Loop #

Consumer bekerja dalam loop poll — ia terus bertanya ke broker apakah ada pesan baru:

public void start() {
    // Subscribe ke satu atau lebih topic
    consumer.subscribe(Collections.singletonList(TOPIC));

    try {
        while (running) {
            // Poll dengan timeout — berapa lama menunggu jika tidak ada pesan baru
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (var record : records) {
                try {
                    processRecord(record.key(), record.value());

                    // ✓ Commit offset setelah proses berhasil
                    // commitSync() — lebih lambat tapi lebih aman (blokir sampai konfirmasi)
                    consumer.commitSync();

                } catch (Exception e) {
                    System.err.printf(
                        "Gagal memproses record di partition=%d offset=%d: %s%n",
                        record.partition(),
                        record.offset(),
                        e.getMessage()
                    );
                    // Jangan commit offset — pesan akan diproses ulang setelah restart
                }
            }
        }
    } finally {
        consumer.close();
    }
}

private void processRecord(String key, String value) {
    System.out.printf("Memproses order %s: %s%n", key, value);
    // implementasi logika bisnis
}

public void stop() {
    running = false;
}

Commit Offset: Sync vs Async #

Pilihan cara commit offset mempengaruhi throughput dan keandalan:

// commitSync() — blokir sampai broker konfirmasi commit
// KAPAN: di akhir batch, saat aplikasi mau shutdown, saat butuh jaminan penuh
consumer.commitSync();

// commitAsync() — tidak blokir, cocok untuk throughput tinggi
// KAPAN: saat memproses batch besar dan kamu bisa toleransi at-least-once
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit gagal: " + exception.getMessage());
        // commitAsync tidak retry otomatis untuk hindari out-of-order commit
        // retry manual di sini jika diperlukan
    }
});

// ✓ PATTERN TERBAIK: kombinasi — async selama proses, sync di akhir atau saat shutdown
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (var record : records) {
        processRecord(record.key(), record.value());
    }
    consumer.commitAsync(); // async untuk throughput
}
consumer.commitSync(); // sync saat shutdown untuk keamanan

Consumer Group #

Consumer group adalah mekanisme utama Kafka untuk skalabilitas horizontal. Beberapa consumer dalam group yang sama secara kolektif mengonsumsi semua partition dari sebuah topic.

flowchart TD
    T[Topic: order-events\nPartition 0, 1, 2, 3]

    subgraph CGA[Consumer Group A — order-processor]
        C1[Consumer 1\nP0, P1]
        C2[Consumer 2\nP2, P3]
    end

    subgraph CGB[Consumer Group B — order-analytics]
        C3[Consumer 3\nP0, P1, P2, P3]
    end

    T --> C1
    T --> C2
    T --> C3

Poin kunci tentang consumer group:

  • Satu partition hanya bisa dikonsumsi oleh satu consumer dalam group yang sama — ini menjamin ordering per partition.
  • Beberapa consumer group bisa membaca topic yang sama secara independen — Group B di atas mendapat semua pesan yang sama dengan Group A, tapi masing-masing punya offset sendiri.
  • Jika jumlah consumer > jumlah partition, consumer berlebih akan idle — ini bukan error, tapi membuang resource.

Rebalancing #

Rebalancing terjadi ketika:

  • Consumer baru bergabung ke group
  • Consumer meninggalkan group (crash atau shutdown)
  • Partition baru ditambahkan ke topic
  • Subscribe pattern berubah

Selama rebalancing, semua consumer di group berhenti mengonsumsi. Ini adalah jeda yang biasa disebut “stop the world”. Untuk meminimalkan dampaknya, implementasikan ConsumerRebalanceListener:

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

public class OrderConsumerRebalanceListener implements ConsumerRebalanceListener {

    private final KafkaConsumer<String, String> consumer;

    public OrderConsumerRebalanceListener(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Dipanggil SEBELUM rebalancing — commit offset untuk semua partition yang akan diambil
        System.out.println("Partition akan diambil, commit offset terlebih dahulu...");
        consumer.commitSync(); // pastikan tidak ada pesan yang terlewat
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Dipanggil SETELAH rebalancing — partition baru sudah di-assign
        System.out.println("Partition baru di-assign: " + partitions);
    }
}

// Gunakan listener saat subscribe
consumer.subscribe(
    Collections.singletonList("order-events"),
    new OrderConsumerRebalanceListener(consumer)
);

Serialisasi dengan JSON #

Di production, pesan Kafka hampir selalu berupa JSON atau Avro. Berikut cara menggunakan Jackson untuk serialisasi JSON:

<!-- Tambahkan ke pom.xml -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.17.1</version>
</dependency>
import com.fasterxml.jackson.databind.ObjectMapper;

// Model untuk event
public record OrderEvent(
    String orderId,
    String status,
    double totalAmount,
    long timestamp
) {}

// Serializer kustom untuk producer
public class JsonSerializer<T> implements org.apache.kafka.common.serialization.Serializer<T> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) return null;
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException("Gagal serialize ke JSON: " + e.getMessage(), e);
        }
    }
}

// Deserializer kustom untuk consumer
public class JsonDeserializer<T> implements org.apache.kafka.common.serialization.Deserializer<T> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class<T> targetClass;

    public JsonDeserializer(Class<T> targetClass) {
        this.targetClass = targetClass;
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) return null;
        try {
            return objectMapper.readValue(data, targetClass);
        } catch (Exception e) {
            throw new RuntimeException("Gagal deserialize dari JSON: " + e.getMessage(), e);
        }
    }
}

Gunakan serializer kustom ini di konfigurasi producer dan consumer:

// Di producer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

// Di consumer
// Untuk generic class, perlu pendekatan berbeda karena type erasure Java
KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(props,
    new StringDeserializer(),
    new JsonDeserializer<>(OrderEvent.class)
);

Error Handling dan Retry #

Kafka tidak punya built-in dead letter queue (DLQ), tapi kamu bisa membangunnya sendiri. Pattern yang umum adalah mengirim pesan yang gagal ke topic terpisah:

public class ResilientConsumer {

    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> dlqProducer;
    private static final String TOPIC = "order-events";
    private static final String DLQ_TOPIC = "order-events.DLQ";
    private static final int MAX_RETRY = 3;

    // ... constructor

    public void startWithDLQ() {
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (var record : records) {
                boolean processed = false;

                for (int attempt = 1; attempt <= MAX_RETRY; attempt++) {
                    try {
                        processRecord(record.key(), record.value());
                        processed = true;
                        break; // keluar dari retry loop jika berhasil

                    } catch (Exception e) {
                        System.err.printf(
                            "Percobaan %d/%d gagal untuk key=%s: %s%n",
                            attempt, MAX_RETRY, record.key(), e.getMessage()
                        );

                        if (attempt < MAX_RETRY) {
                            try {
                                Thread.sleep(100L * attempt); // exponential-like backoff
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }

                if (!processed) {
                    // Kirim ke DLQ untuk investigasi manual
                    sendToDLQ(record.key(), record.value(), "Gagal setelah " + MAX_RETRY + " percobaan");
                }
            }

            consumer.commitSync();
        }
    }

    private void sendToDLQ(String key, String value, String reason) {
        // Tambahkan konteks error sebagai header atau prefix value
        String dlqPayload = String.format("{\"original\": %s, \"dlqReason\": \"%s\", \"timestamp\": %d}",
            value, reason, System.currentTimeMillis());

        dlqProducer.send(
            new ProducerRecord<>(DLQ_TOPIC, key, dlqPayload),
            (metadata, ex) -> {
                if (ex != null) {
                    System.err.println("KRITIS: Gagal mengirim ke DLQ! Key=" + key);
                }
            }
        );
    }

    private void processRecord(String key, String value) {
        // logika bisnis
    }
}
flowchart TD
    A[Consumer poll\npesan baru] --> B{Proses berhasil?}
    B -- Ya --> C[Commit offset]
    B -- Tidak --> D{Sudah retry\nmaks?}
    D -- Belum --> E[Tunggu backoff] --> B
    D -- Sudah --> F[Kirim ke DLQ topic]
    F --> G[Commit offset\nmeski gagal]
    G --> H[Alert ke tim ops]
Jangan biarkan consumer crash terus-menerus tanpa DLQ. Consumer yang terus gagal dan tidak commit offset akan membaca pesan yang sama berulang kali, memblokir seluruh partition dari pemrosesan pesan baru.

Manajemen Topic Secara Programatis #

Selain menggunakan command line, kamu bisa membuat dan mengelola topic dari kode Java menggunakan AdminClient:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.CreateTopicsResult;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaTopicManager {

    private final AdminClient adminClient;

    public KafkaTopicManager() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.adminClient = AdminClient.create(props);
    }

    public void createTopic(String topicName, int partitions, short replicationFactor) {
        NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

        // Konfigurasi retention: 7 hari (dalam milidetik)
        newTopic.configs(Map.of(
            "retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000L),
            "compression.type", "snappy",
            "cleanup.policy", "delete"
        ));

        CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));

        try {
            result.all().get(); // tunggu sampai selesai
            System.out.println("Topic berhasil dibuat: " + topicName);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
                System.out.println("Topic sudah ada: " + topicName);
            } else {
                throw new RuntimeException("Gagal membuat topic: " + e.getMessage(), e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void listTopics() throws Exception {
        var topics = adminClient.listTopics().names().get();
        topics.forEach(System.out::println);
    }

    public void close() {
        adminClient.close();
    }
}

Menjalankan Bersama — Contoh Lengkap #

Berikut adalah contoh lengkap producer dan consumer yang bekerja bersama dalam satu aplikasi:

public class KafkaDemo {

    public static void main(String[] args) throws Exception {
        String topic = "demo-orders";
        String groupId = "order-processor";

        // Buat topic terlebih dahulu
        KafkaTopicManager topicManager = new KafkaTopicManager();
        topicManager.createTopic(topic, 3, (short) 1);
        topicManager.close();

        // Jalankan consumer di thread terpisah
        Thread consumerThread = new Thread(() -> {
            OrderConsumer consumer = new OrderConsumer(groupId);
            consumer.start();
        });
        consumerThread.setDaemon(true);
        consumerThread.start();

        // Producer mengirim beberapa pesan
        OrderProducer producer = new OrderProducer();
        for (int i = 1; i <= 10; i++) {
            String orderId = "order-" + i;
            String payload = String.format("{\"id\": \"%s\", \"status\": \"created\", \"amount\": %.2f}",
                orderId, Math.random() * 1000);
            producer.sendAsync(orderId, payload);
        }

        // Tunggu sebentar agar consumer sempat memproses
        Thread.sleep(5000);
        producer.close();
    }
}

Kapan Menggunakan Kafka dan Kapan Tidak #

Kafka bukan solusi untuk semua kebutuhan messaging. Memilih Kafka di situasi yang tidak tepat menambah kompleksitas operasional tanpa manfaat yang sepadan.

GUNAKAN KAFKA JIKA:
  ✓ Volume pesan sangat tinggi (ratusan ribu per detik)
  ✓ Butuh replay — consumer perlu membaca ulang pesan lama
  ✓ Banyak consumer independen yang perlu membaca data yang sama
  ✓ Event sourcing — event sebagai sumber kebenaran sistem
  ✓ Stream processing — analitik real-time, agregasi, transformasi
  ✓ Audit log — butuh riwayat semua kejadian dalam sistem
  ✓ Decoupling antara banyak service dalam arsitektur microservices

PERTIMBANGKAN ALTERNATIF JIKA:
  ✗ Volume rendah dan kompleksitas routing tinggi → RabbitMQ lebih cocok
  ✗ Butuh request-reply (RPC pattern) → gRPC atau REST lebih natural
  ✗ Task queue sederhana (job yang harus dieksekusi sekali) → RabbitMQ atau Amazon SQS
  ✗ Tim kecil tanpa kapasitas ops untuk maintain Kafka cluster → kelola biaya operasional dulu
  ✗ Latency sub-millisecond sangat kritis → Kafka bukan pilihan terbaik
flowchart TD
    A{Volume pesan\nsangat tinggi?} -- Ya --> B{Butuh\nreplay?}
    A -- Tidak --> C{Banyak consumer\nindependen?}

    B -- Ya --> KAFKA[Kafka]
    B -- Tidak --> D{Butuh durability\ndan ordering?}

    C -- Ya --> KAFKA
    C -- Tidak --> E{Task queue\nsederhana?}

    D -- Ya --> KAFKA
    D -- Tidak --> RABBIT[RabbitMQ atau SQS]

    E -- Ya --> RABBIT
    E -- Tidak --> REST[REST atau gRPC]

Ringkasan #

  • Topic dan partition adalah fondasi Kafka — partition adalah unit paralelisme, dan ordering hanya dijamin dalam satu partition. Gunakan message key untuk routing deterministik ke partition yang sama.
  • Producer async dengan callback adalah pilihan terbaik untuk throughput tinggi — gunakan acks=all dan enable.idempotence=true untuk durability tanpa duplikasi.
  • Matikan auto commit offset (enable.auto.commit=false) dan lakukan commit manual setelah pemrosesan berhasil untuk menghindari pesan yang terlewat.
  • Consumer group memungkinkan skalabilitas horizontal — tambah consumer untuk meningkatkan throughput, tapi jumlah consumer yang efektif dibatasi oleh jumlah partition.
  • Implementasikan Dead Letter Queue (DLQ) sebagai topic terpisah untuk menangani pesan yang gagal diproses — jangan biarkan consumer loop selamanya pada pesan yang rusak.
  • ConsumerRebalanceListener penting untuk commit offset sebelum rebalancing dimulai, mencegah pesan diproses dua kali setelah partition berpindah consumer.
  • AdminClient memungkinkan manajemen topic dari kode Java — ideal untuk memastikan topic sudah ada sebelum producer mulai mengirim.
  • Kafka paling cocok untuk volume tinggi, event sourcing, stream processing, dan banyak consumer independen — bukan untuk task queue sederhana atau request-reply pattern.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact