Try it out locally
Run Kafka
First, get a locally running kafka instance by following Apache Kafka quickstart guide. This usually boils down to:
export KAFKA_HOME=<your kafka install dir>
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic mytopic
Next, run Camel kafka connectors source and/or syncs:
In order to run more than one instance of a standalone kafka connect on the same machine you neet to duplicate
|
You can use these Kafka utilities to listen or produce from a Kafka topic:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
Try some examples
For the following examples you need to fetch the camel-kafka-connector
project and build it locally by running ./mvnw package
from the root of the project. Look into the config
and examples
directories for the configuration files (*.properties
) of the examples showcased here.
First you need to set the CLASSPATH
environment variable to include the jar
files from the core/target/camel-kafka-connector-[version]-package/share/java/
directory. On UNIX systems this can be done by running:
export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
Simple logger (sink)
This is an example of a sink that logs messages consumed from mytopic
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSinkConnector.properties
Timer (source)
This is an example of a source that produces a message every second to mytopic
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSourceConnector.properties
AWS Kinesis (source)
This example consumes from AWS Kinesis data stream and transfers the payload to mytopic
topic in Kafka.
Adjust properties in examples/CamelAWSKinesisSourceConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-kinesis.configuration.access-key=youraccesskey
, camel.component.aws-kinesis.configuration.secret-key=yoursecretkey
and camel.component.aws-kinesis.configuration.region=yourregion
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSKinesisSourceConnector.properties
AWS SQS (sink)
This example consumes from Kafka topic mytopic
and transfers the payload to AWS SQS.
Adjust properties in examples/CamelAWSSQSSinkConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey
, camel.component.aws-sqs.configuration.secret-key=yoursecretkey
and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSinkConnector.properties
AWS SQS (source)
This example consumes from AWS SQS queue mysqs
and transfers the payload to mytopic
topic in Kafka.
Adjust properties in examples/CamelAWSSQSSourceConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey
, camel.component.aws-sqs.configuration.secret-key=yoursecretkey
and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSourceConnector.properties
AWS SNS (sink)
This example consumes from mytopic
Kafka topic and transfers the payload to AWS SNS topic
topic.
Adjust properties in examples/CamelAWSSNSSinkConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sns.configuration.access-key=youraccesskey
, camel.component.aws-sns.configuration.secret-key=yoursecretkey
and camel.component.aws-sns.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSNSSinkConnector.properties
AWS S3 (source)
This example fetches objects from AWS S3 in the camel-kafka-connector
bucket and transfers the payload to mytopic
Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka’s SchemaAndValue
.
Adjust properties in examples/CamelAWSS3SourceConnector.properties
for your environment, you need to configure access key, secret key and region by adding camel.component.aws-s3.configuration.access-key=youraccesskey
, camel.component.aws-s3.configuration.secret-key=yoursecretkey
and camel.component.aws-s3.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSS3SourceConnector.properties
Apache Cassandra
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you’ll need to run a Cassandra instance:
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
Next, check and make sure Cassandra is running:
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
To populate the database using to the cqlsh
tool, you’ll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with LOCAL_CASSANDRA_HOME
. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
Next, execute the following script to create keyspace test
, the table users
and insert one row into it.
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
In the configuration .properties
file we use below the IP address of the Cassandra master node needs to be configured, replace the value 172.17.0.2
in the camel.source.url
or localhost
in camel.sink.url
configuration property with the IP of the master node obtained from Docker. Each example uses a different .properties
file shown in the command line to run the example.
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
Apache Cassandra (source)
This example polls Cassandra via CSQL (select * from users
) in the test
keyspace and transfers the result to the mytopic
Kafka topic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSourceConnector.properties
Apache Cassandra (sink)
This example adds data to the users
table in Cassandra from the data consumed from the mytopic
Kafka topic. Notice how the name
column is populated from the Kafka message using CQL comand insert into users…
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSinkConnector.properties
Elasticsearch (sink)
This example passes data from mytopic
Kafka topic to sampleIndexName
index in Elasticsearch. Adjust properties in examples/CamelElasticSearchSinkConnector.properties
to reflect your environment, for example change the hostAddresses
to a valid Elasticsearch instance hostname and port.
For the index operation, it might be necessary to provide or implement a transformer
. A sample configuration would be similar to the one below:
transforms=ElasticSearchTransformer
This is the sample Transformer used in the integration test code that transforms Kafka’s ConnectRecord to a Map:
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer
This is a configuration for the sample transformer that defines the key used in the map:
transforms.ElasticSearchTransformer.key=MyKey
When the configuration is ready run the sink with:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelElasticSearchSinkConnector.properties
File (sink)
This example appends data from mytopic
Kafka topic to a file in /tmp/kafkaconnect.txt
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelFileSinkConnector.properties
HTTP (sink)
This example sends data from mytopic
Kafka topic to a HTTP service. Adjust properties in examples/CamelHttpSinkConnector.properties
for your environment, for example configuring the camel.sink.url
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelHttpSinkConnector.properties
JMS (source)
This example receives messages from a JMS queue named myqueue
and transfers them to mytopic
Kafka topic. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616
. Adjust properties in examples/CamelJmsSourceConnector.properties
for your environment, for example configuring username and password by setting camel.component.sjms2.connection-factory.userName=yourusername
and camel.component.sjms2.connection-factory.password=yourpassword
or change the camel.component.sjms2.connection-factory
and camel.component.sjms2.connection-factory.brokerURL
to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSourceConnector.properties
JMS (sink)
This example receives messages from mytopic
Kafka topic and transfers them to JMS queue named myqueue
. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616
. You can adjust properties in examples/CamelJmsSinkConnector.properties
for your environment, for example configure username and password by adding camel.component.sjms2.connection-factory.userName=yourusername
and camel.component.sjms2.connection-factory.password=yourpassword
or change the camel.component.sjms2.connection-factory
and camel.component.sjms2.connection-factory.brokerURL
to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSinkConnector.properties
Telegram (source)
This example transfers messages sent to Telegram bot to the mytopic
Kafka topic. Adjust to set telegram bot token in examples/CamelTelegramSourceConnector.properties
to reflect your bot’s token.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelTelegramSourceConnector.properties