Fluent Commerce Logo
Docs
Sign In

Kafka Connector built on top of Connect SDK

How-to Guide

Author:

Fluent Commerce

Changed on:

30 Oct 2023

Key Points

  • The User can configure Kafka Connector on top of the Connect SDK.
  • Some configuration is required. 

Steps

Step arrow right iconKafka Connector built on top of Connect SDK

We have extended our queue listener support beyond SQS and now also includes Kafka now.

Kafka and SQS can be used interchangeably as queue listeners, providing more options to our users.

Provided reference solutions can be used to demonstrate how to apply these changes to configuration files and use either queue listener. With these examples, it's easy to switch between SQS or Kafka, depending on your specific needs and requirements.

Follow this Fluent Connector Configuration | Kafka Listener Configuration for a more detailed explanation of Kafka configuration.

Prerequisites

  • At least a running Kafka broker, ZooKeeper containerKafdrop(Optional)
  • localstack container is running
  • Access to a Fluent account
  • API key/secret for communicating with external network

Create a Docker file 

`docker-compose.yml`
 with content below to quickly set up a Kafka broker.

1version: "3.9"
2services:
3  localstack:
4    container_name: localstack
5    image: localstack/localstack
6    restart: unless-stopped
7    ports:
8      - "4510-4559:4510-4559"  # external service port range
9      - "4566:4566"
10    environment:
11      - AWS_DEFAULT_REGION=us-east-1
12      - SERVICES=sqs,secretsmanager
13      - HOSTNAME=localstack
14      - HOSTNAME_EXTERNAL=localstack
15    volumes:
16      - ./docker/localstack:/docker-entrypoint-initaws.d
17
18  kafdrop:
19    image: obsidiandynamics/kafdrop
20    container_name: kafkadrop
21    restart: "unless-stopped"
22    ports:
23      - "9000:9000"
24    environment:
25      KAFKA_BROKERCONNECT: "kafka:29092"
26      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
27    depends_on:
28      - "kafka"
29
30  zookeeper:
31    image: wurstmeister/zookeeper
32    ports:
33      - "2181:2181"
34    restart: unless-stopped
35
36  kafka:
37    image: wurstmeister/kafka
38    ports:
39      - "9092:9092"
40    environment:
41      KAFKA_ADVERTISED_HOST_NAME: kafka
42      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
43      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
44      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
45      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
46      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
47      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
48      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
49      KAFKA_RESTART_ATTEMPTS: "10"
50      KAFKA_RESTART_DELAY: "5"
51    restart: unless-stopped
52    depends_on:
53      - zookeeper

Language: java

Name: docker-compose.yml

Description:

[Warning: empty required content area]

Configuration Changes

This is an example of using Kafka:

1fluent-connect:
2  connector-name: ${connectorName}
3  listeners:
4  # Internal listener:
5    messages:
6      name: "KAFKA_EVENTS"
7      type: "kafka"
8      retry: 5
9      props:
10        retryIntervalInSec: 10
11    batch:
12      name: "KAFKA_BATCH"
13      retry: 1
14      type: "kafka"
15      props:
16        retryIntervalInSec: 10
17    notification:
18      name: "KAFKA_NOTIFICATION"
19      retry: 5
20      type: "sqs"
21      poll-wait: 1
22      props:
23        retryIntervalInSec: 10
24kafka:
25  props:
26    bootstrap.servers: localhost:9092    

Language: java

Name: This is an example of using Kafka:

Description:

[Warning: empty required content area]

Property

Mandatory

Description

Default Value

`fluent-connect.listeners.<listener_name>.type`

Yes



`"sqs"`
 or 
`"kafka"`

`fluent-connect.listeners.<listener_name>.retry`

No

The number of retries for a topic determines the number of corresponding retry topics that will be created. These retry topics should follow a common naming convention:

`{listenerId}-retry-{iterationCount}`

For instance, if the "messages" listener has been configured with three retries, the Kafka container should include the following topics:messagesmessages-retry-1messages-retry-2messages-retry-3
(Maximum retry is 5. Last failed retry will be moved to Dead Letter Queue topic.)

n/a

`fluent-connect.listeners.<listener_name>.props`
:

`retryIntervalInSec`

No

The delay time between retry attempts.

(The delay time should not be greater than max pool interval of current Kafka broker.)

n/a

`kafka.props`

No

The connection configurations for Kafka producer and consumer can be referred to as:

(If this property is not set, the producer and consumer will connect to the Kafka on local at 

`localhost:9092`
)

`kafka:`

`  props:`

`bootstrap.servers: localhost:9092`

`kafka.props`
example:

1kafka:
2  props:
3    # Required connection configs for Kafka Local
4    bootstrap.servers: localhost:9092
5
6    # Best practice for higher availability in Apache Kafka clients prior to 3.0
7    session.timeout.ms: 45000
8    
9    # Best practice for Kafka producer to prevent data loss
10    acks: all

Language: java

Name: Integrate with Kafka local:

Description:

[Warning: empty required content area]
1kafka:
2  props:
3    # Required connection configs for Confluent Cloud
4    bootstrap.servers: pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
5    security.protocol: SASL_SSL
6    sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
7    sasl.mechanism: PLAIN
8    client.dns.lookup: use_all_dns_ips
9
10    # Best practice for higher availability in Apache Kafka clients prior to 3.0
11    session.timeout.ms: 45000
12
13    # Best practice for Kafka producer to prevent data loss
14    acks: all

Language: java

Name: Integrate with external Kafka (e.g. Confluent Cloud):

Description:

[Warning: empty required content area]
Fluent Commerce

Fluent Commerce

Copyright © 2025 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.

Fluent Logo