Skip to main content

Kafka Connector

The Kafka Producer Connector is an outbound Connector that allows you to connect your BPMN service with Kafka to produce messages.

Prerequisites​

To use the Kafka Producer Connector, you need to have a Kafka instance with configured bootstrap server. Use Camunda secrets to avoid exposing your sensitive data as plain text. Follow our documentation on managing secrets to learn more.

Create a Kafka Producer Connector task​

To use the Kafka Producer Connector in your process, either change the type of existing task by clicking on it and using the wrench-shaped Change type context menu icon, or create a new Connector task by using the Append Connector context menu. Follow our guide on using Connectors to learn more.

Make your Kafka Producer Connector for publishing messages executable​

To make your Kafka Producer Connector for publishing messages executable, take the following steps:

  1. (Optional) Set the relevant credentials in the Authentication section. For example, {{secrets.MY_KAFKA_USERNAME}}. See the relevant appendix section to find more about Kafka secure authentication.
  2. In the Kafka section, select the serialization type for your messages. Choose Default (JSON) for JSON serialization or AVRO (experimental) for Avro serialization. Read more about Kafka Avro serialization.
  3. In the Kafka section, set the URL of bootstrap server(s); comma-separated if more than one server required.
  4. In the Kafka section, set the topic name.
  5. (Optional) In the Kafka section, fill out the field Headers to set producer configuration values. Only UTF-8 strings are supported as header values.
  6. (Optional) In the Kafka section, fill out the field Additional properties to set producer configuration values. See the list of supported configurations at the official Kafka documentation page. Also check preconfigured values for the Kafka Producer Connector in the relevant appendix section.
  7. In the Message section, set the Key and the Value that will be sent to Kafka topic.
  8. (Optional for AVRO (experimental)) In the Avro schema field, input the schema that defines the message structure. Ensure this schema is in your Avro schema registry.

AVRO serialization​

note

Use Avro serialization with caution, as this is an experimental feature. Functionality may not be comprehensive and could change.

The Kafka Producer Connector supports Avro serialization, which offers a compact, fast, and binary data exchange format for Kafka messages. Avro relies on schemas for serialization and deserialization. When using Avro, each message is serialized according to a specific schema written in JSON format. This schema defines the structure of the Kafka message, ensuring the data conforms to a predefined format and enabling schema evolution strategies.

For more detailed information on Kafka Avro serialization, you may refer to the official Kafka documentation and official Apache Avro documentation.

Example Avro schema and data​

Avro schema:​

{
"doc": "Sample schema to help you get started.",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "emails",
"type": {
"items": "string",
"type": "array"
}
}
],
"name": "sampleRecord",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}

Kafka message​

  • Key : employee1
  • Value :
{
"name": "John Doe",
"age": 29,
"emails": ["johndoe@example.com"]
}

Kafka Producer Connector response​

The Kafka Producer Connector returns metadata for a record that has been acknowledged by the Kafka instance.

The following fields are available in the response variable:

  • timestamp: the timestamp of the message
  • offset: message offset
  • partition: message partition
  • topic: topic name

You can read about these fields at the official Kafka documentation page.

You can use an output mapping to map the response:

  1. Use Result Variable to store the response in a process variable. For example, myResultVariable.
  2. Use Result Expression to map fields from the response into process variables. For example:
= {
"messageAcknowledgedAt": response.timestamp
}

Appendix & FAQ​

What mechanism is used to authenticate against Kafka?​

If the fields Username and Password are not empty, by default the Kafka Producer Connector enables the credentials-based SASL SSL authentication and the following properties are set:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='<Your Username>'   password='<Your Password>';
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

If any of the field is not populated, you have to configure your security method in respect to your Kafka configuration. You can do so via the field Additional properties.

What are default Kafka Producer client properties?​

  • Authentication properties (only if both Username and Password are not empty):
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='<Your Username>'   password='<Your Password>';
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
  • Bootstrap server property:
bootstrap.servers=<bootstrap server(s) from BPMN>
  • Message properties:
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • Miscellaneous properties:
session.timeout.ms=45000
client.dns.lookup=use_all_dns_ips
acks=all
delivery.timeout.ms=45000

What is the precedence of client properties loading?​

Properties loading consists of three steps:

  1. Construct client properties from the BPMN diagram: authentication, bootstrap server, message properties.
  2. Load miscellaneous properties.
  3. Load and override properties from the field Additional properties.

How do I set or override additional client properties?​

The following example sets a new client property client.id and overrides SASL mechanism to SCRAM SHA-256 instead of plain text:

= {
"client.id":"MyDemoClientId",
"sasl.mechanism":"SCRAM-SHA-256"
}