Try it out locally
Run Kafka
First, get a locally running kafka instance by following Apache Kafka quickstart guide. This usually boils down to:
export KAFKA_HOME=<your kafka install dir>
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic mytopic
Next, run Camel kafka connectors source and/or syncs:
|
In order to run more than one instance of a standalone kafka connect on the same machine you neet to duplicate
|
You can use these Kafka utilities to listen or produce from a Kafka topic:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
Try some examples
For the following examples you need to fetch the camel-kafka-connector project and build it locally by running ./mvnw package from the root of the project. Look into the config and examples directories for the configuration files (*.properties) of the examples showcased here.
First you need to set the CLASSPATH environment variable to include the jar files from the core/target/camel-kafka-connector-[version]-package/share/java/ directory. On UNIX systems this can be done by running:
export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
Simple logger (sink)
This is an example of a sink that logs messages consumed from mytopic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSinkConnector.properties
Timer (source)
This is an example of a source that produces a message every second to mytopic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSourceConnector.properties
AWS Kinesis (source)
This example consumes from AWS Kinesis data stream and transfers the payload to mytopic topic in Kafka.
Adjust properties in examples/CamelAWSKinesisSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-kinesis.configuration.access-key=youraccesskey, camel.component.aws-kinesis.configuration.secret-key=yoursecretkey and camel.component.aws-kinesis.configuration.region=yourregion.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSKinesisSourceConnector.properties
AWS SQS (sink)
This example consumes from Kafka topic mytopic and transfers the payload to AWS SQS.
Adjust properties in examples/CamelAWSSQSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSinkConnector.properties
AWS SQS (source)
This example consumes from AWS SQS queue mysqs and transfers the payload to mytopic topic in Kafka.
Adjust properties in examples/CamelAWSSQSSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSourceConnector.properties
AWS SNS (sink)
This example consumes from mytopic Kafka topic and transfers the payload to AWS SNS topic topic.
Adjust properties in examples/CamelAWSSNSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sns.configuration.access-key=youraccesskey, camel.component.aws-sns.configuration.secret-key=yoursecretkey and camel.component.aws-sns.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSNSSinkConnector.properties
AWS S3 (source)
This example fetches objects from AWS S3 in the camel-kafka-connector bucket and transfers the payload to mytopic Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka’s SchemaAndValue.
Adjust properties in examples/CamelAWSS3SourceConnector.properties for your environment, you need to configure access key, secret key and region by adding camel.component.aws-s3.configuration.access-key=youraccesskey, camel.component.aws-s3.configuration.secret-key=yoursecretkey and camel.component.aws-s3.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSS3SourceConnector.properties
Apache Cassandra
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you’ll need to run a Cassandra instance:
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
Next, check and make sure Cassandra is running:
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
To populate the database using to the cqlsh tool, you’ll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with LOCAL_CASSANDRA_HOME. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
Next, execute the following script to create keyspace test, the table users and insert one row into it.
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
In the configuration .properties file we use below the IP address of the Cassandra master node needs to be configured, replace the value 172.17.0.2 in the camel.source.url or localhost in camel.sink.url configuration property with the IP of the master node obtained from Docker. Each example uses a different .properties file shown in the command line to run the example.
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
Apache Cassandra (source)
This example polls Cassandra via CSQL (select * from users) in the test keyspace and transfers the result to the mytopic Kafka topic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSourceConnector.properties
Apache Cassandra (sink)
This example adds data to the users table in Cassandra from the data consumed from the mytopic Kafka topic. Notice how the name column is populated from the Kafka message using CQL comand insert into users….
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSinkConnector.properties
Elasticsearch (sink)
This example passes data from mytopic Kafka topic to sampleIndexName index in Elasticsearch. Adjust properties in examples/CamelElasticSearchSinkConnector.properties to reflect your environment, for example change the hostAddresses to a valid Elasticsearch instance hostname and port.
For the index operation, it might be necessary to provide or implement a transformer. A sample configuration would be similar to the one below:
transforms=ElasticSearchTransformer
This is the sample Transformer used in the integration test code that transforms Kafka’s ConnectRecord to a Map:
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer
This is a configuration for the sample transformer that defines the key used in the map:
transforms.ElasticSearchTransformer.key=MyKey
When the configuration is ready run the sink with:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelElasticSearchSinkConnector.properties
File (sink)
This example appends data from mytopic Kafka topic to a file in /tmp/kafkaconnect.txt.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelFileSinkConnector.properties
HTTP (sink)
This example sends data from mytopic Kafka topic to a HTTP service. Adjust properties in examples/CamelHttpSinkConnector.properties for your environment, for example configuring the camel.sink.url.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelHttpSinkConnector.properties
JMS (source)
This example receives messages from a JMS queue named myqueue and transfers them to mytopic Kafka topic. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. Adjust properties in examples/CamelJmsSourceConnector.properties for your environment, for example configuring username and password by setting camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSourceConnector.properties
JMS (sink)
This example receives messages from mytopic Kafka topic and transfers them to JMS queue named myqueue. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. You can adjust properties in examples/CamelJmsSinkConnector.properties for your environment, for example configure username and password by adding camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSinkConnector.properties
Telegram (source)
This example transfers messages sent to Telegram bot to the mytopic Kafka topic. Adjust to set telegram bot token in examples/CamelTelegramSourceConnector.properties to reflect your bot’s token.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelTelegramSourceConnector.properties