Skip to content

Latest commit

 

History

History
336 lines (300 loc) · 18.4 KB

README.md

File metadata and controls

336 lines (300 loc) · 18.4 KB

Third Party connectors for Apache Pulsar

This project provides simple templates and instructions to build Apache Pulsar connectors on the base of existing Apache Kafka connectors.

Apache Pulsar's current acceptance criteria for connectors requires a developer brave and experienced enough with both Pulsar and third party systems to contribute the connector and required integration tests.

This project relaxes the criteria to allow developers to quickly move connectors they used with their Apache Kafka infrastructure into Apache Pulsar's, and incrementally work on improvements.

The project uses Apache Pulsar's Kafka Connect Adaptor (KCA). More information about KCA is available in this blog post. KCA is used for such popular Pulsar connectors as Pulsar Debezium Source Connectors and Pulsar Snowflake Sink Connector.

The connectors built with this project require Datastax Pulsar Luna 2.8+ or Apache Pulsar 2.9+.

For the details of the status of the specific connector and available connectors, navigate to pulsar-connector/<connector name> and check the readme provided by the contributor.

Added connectors, so far:

  1. Azure Data Explorer (Kusto)
  2. Azure DocumentDB
  3. Apache Geode
  4. Apache Kudu
  5. Apache Phoenix
  6. Apache PLC4X
  7. CoAP
  8. Couchbase
  9. DataDog Logs
  10. Diffusion
  11. Google BigQuery
  12. Hazelcast Jet
  13. Humio HEC
  14. JMS
  15. Kinetica
  16. MarkLogic
  17. MQTT
  18. Neo4J
  19. New Relic
  20. OrientDB
  21. Redis
  22. SAP HANA
  23. SingleStore
  24. Splunk
  25. XTDB
  26. Zeebe
  27. camel-aws-cloudwatch-sink
  28. camel-aws-ddb-sink
  29. camel-aws-ddb-streams-source
  30. camel-aws-ec2-sink
  31. camel-aws-eventbridge-sink
  32. camel-aws-kinesis-firehose-sink
  33. camel-aws-kinesis-sink
  34. camel-aws-kinesis-source
  35. camel-aws-lambda-sink
  36. camel-aws-redshift-sink
  37. camel-aws-redshift-source
  38. camel-aws-s3-sink
  39. camel-aws-s3-source
  40. camel-aws-s3-streaming-upload-sink
  41. camel-aws-secrets-manager-sink
  42. camel-aws-ses-sink
  43. camel-aws-sns-fifo-sink
  44. camel-aws-sns-sink
  45. camel-aws-sqs-batch-sink
  46. camel-aws-sqs-fifo-sink
  47. camel-aws-sqs-sink
  48. camel-aws-sqs-source
  49. camel-aws2-iam
  50. camel-aws2-kms
  51. camel-azure-cosmosdb-source
  52. camel-azure-eventhubs-sink
  53. camel-azure-eventhubs-source
  54. camel-azure-functions-sink
  55. camel-azure-servicebus-sink
  56. camel-azure-servicebus-source
  57. camel-azure-storage-blob-changefeed-source
  58. camel-azure-storage-blob-sink
  59. camel-azure-storage-blob-source
  60. camel-azure-storage-queue-sink
  61. camel-azure-storage-queue-source
  62. camel-beer-source
  63. camel-bitcoin-source
  64. camel-cassandra-sink
  65. camel-cassandra-source
  66. camel-ceph-sink
  67. camel-ceph-source
  68. camel-chuck-norris-source
  69. camel-couchbase-sink
  70. camel-cron-source
  71. camel-cxf
  72. camel-cxfrs
  73. camel-dropbox-sink
  74. camel-dropbox-source
  75. camel-earthquake-source
  76. camel-elasticsearch-index-sink
  77. camel-elasticsearch-search-source
  78. camel-exec-sink
  79. camel-fhir-source
  80. camel-file
  81. camel-file-watch-source
  82. camel-ftp-sink
  83. camel-ftp-source
  84. camel-ftps-sink
  85. camel-ftps-source
  86. camel-github-commit-source
  87. camel-github-event-source
  88. camel-github-pullrequest-comment-source
  89. camel-github-pullrequest-source
  90. camel-github-tag-source
  91. camel-google-bigquery-sink
  92. camel-google-calendar-source
  93. camel-google-functions-sink
  94. camel-google-mail-source
  95. camel-google-pubsub-sink
  96. camel-google-pubsub-source
  97. camel-google-sheets-source
  98. camel-google-storage-sink
  99. camel-google-storage-source
  100. camel-hdfs
  101. camel-http-secured-sink
  102. camel-http-secured-source
  103. camel-http-sink
  104. camel-http-source
  105. camel-https
  106. camel-infinispan-sink
  107. camel-infinispan-source
  108. camel-jdbc
  109. camel-jira-add-comment-sink
  110. camel-jira-add-issue-sink
  111. camel-jira-oauth-source
  112. camel-jira-source
  113. camel-jira-transition-issue-sink
  114. camel-jira-update-issue-sink
  115. camel-jms-amqp-10-sink
  116. camel-jms-amqp-10-source
  117. camel-jms-apache-activemq-sink
  118. camel-jms-apache-activemq-source
  119. camel-jms-apache-artemis-sink
  120. camel-jms-apache-artemis-source
  121. camel-jms-ibm-mq-sink
  122. camel-jms-ibm-mq-source
  123. camel-kafka-not-secured-sink
  124. camel-kafka-not-secured-source
  125. camel-kafka-sink
  126. camel-kafka-source
  127. camel-kafka-ssl-sink
  128. camel-kafka-ssl-source
  129. camel-kubernetes-namespaces-source
  130. camel-kubernetes-nodes-source
  131. camel-kubernetes-pods-source
  132. camel-log-sink
  133. camel-mail-imap-source
  134. camel-mail-sink
  135. camel-mariadb-sink
  136. camel-mariadb-source
  137. camel-minio-sink
  138. camel-minio-source
  139. camel-mongodb-changes-stream-source
  140. camel-mongodb-sink
  141. camel-mongodb-source
  142. camel-mqtt-sink
  143. camel-mqtt-source
  144. camel-mqtt5-sink
  145. camel-mqtt5-source
  146. camel-mysql-sink
  147. camel-mysql-source
  148. camel-nats-sink
  149. camel-nats-source
  150. camel-netty-http
  151. camel-netty
  152. camel-oracle-database-sink
  153. camel-oracle-database-source
  154. camel-postgresql-sink
  155. camel-postgresql-source
  156. camel-pulsar-sink
  157. camel-pulsar-source
  158. camel-rabbitmq-source
  159. camel-redis-sink
  160. camel-redis-source
  161. camel-rest-openapi-sink
  162. camel-salesforce-create-sink
  163. camel-salesforce-delete-sink
  164. camel-salesforce-source
  165. camel-salesforce-update-sink
  166. camel-scp-sink
  167. camel-sftp-sink
  168. camel-sftp-source
  169. camel-sjms2
  170. camel-slack-sink
  171. camel-slack-source
  172. camel-solr-sink
  173. camel-solr-source
  174. camel-splunk-hec-sink
  175. camel-splunk-sink
  176. camel-splunk-source
  177. camel-sqlserver-sink
  178. camel-sqlserver-source
  179. camel-ssh-sink
  180. camel-ssh-source
  181. camel-syslog
  182. camel-telegram-sink
  183. camel-telegram-source
  184. camel-timer-source
  185. camel-twitter-directmessage-source
  186. camel-twitter-search-source
  187. camel-twitter-timeline-source
  188. camel-webhook-source
  189. camel-websocket-source
  190. camel-wttrin-source

The rest of this documentation will dive into details of:

  • How to build connectors
  • How to use connectors
  • How to add a connector

Building the connectors

Ensure you have JDK 11+ and Maven 3.8 installed.

Clone the connector's repo and run mvn clean install from the root.

The connector's .nar files can be found at pulsar-connectors/<connector name>>/target/pulsar-3rdparty-pulsar-connectors-<connector name>-0.1.0-SNAPSHOT.nar

Using the connectors

Follow Pulsar's documentation to use the packaged connector.

Providing configuration for connectors

Sink Connectors

Follow the example below to create a config yaml file:

# Pulsar KCA Sink expects "processingGuarantees" to be "EFFECTIVELY_ONCE"`
processingGuarantees: "EFFECTIVELY_ONCE"
configs:
  # Size of messages in bytes the sink will attempt to batch messages together before flush.
  # batchSize: 16384
  # Time interval in milliseconds the sink will attempt to batch messages together before flush.
  # lingerTimeMs: 2147483647
  # In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.
  # unwrapKeyValueIfAvailable: "true"
  # The Kafka topic name that passed to Kafka sink.
  topic: "my-topic"
  # Pulsar topic to store offsets at.
  offsetStorageTopic: "kafka-connect-sink-offsets"
  # A Kafka connector sink class to use.
  kafkaConnectorSinkClass: "com.third.party.CoolSinkConnector"
  # Config properties to pass to the Kafka connector.
  kafkaConnectorConfigProperties:
    # The following properties passed directly to Kafka Connect Sink and defined by it or by
    # https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
    name: "test-sink"
    connector.class: "com.third.party.CoolSinkConnector"
    tasks.max: "1"
    topics: "my-topic"
    ...

Source Connectors

Follow the example below to create a config yaml file:

tenant: "public"
namespace: "default"
name: "test-source"
topicName: "test-topic"
parallelism: 1
# A Kafka connector source class to use.
className: "com.third.party.CoolSourceConnector"
configs:
  # Present the message only consist of payload.
  # json-with-envelope: "false"

  # Pulsar topic to store Kafka connector offsets at
  offset.storage.topic: "kafka-connect-source-offsets"
  # Pulsar namespace to store the output topics
  topic.namespace: "public/default"
  
  # Config properties to pass to the Kafka connector.
  # The following properties passed directly to Kafka Connect Sink and defined by it or by
  # https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java

  # A Kafka connector source class to use.
  task.class: "com.third.party.CoolSourceConnector"
  # The converter provided by Kafka Connect to convert record value.
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  # The converter provided by Kafka Connect to convert record key.
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  ...

Adding a new connector

These steps help avoid some common problems encountered while using KCA to create a new connector:

  • Ensure the connector's license allows its use and redistribution. A helpful starting point is here.
  • Maven dependency conflict of transitive dependencies in build time.
  • Dependency conflict in runtime caused by third-party dependencies packaged with the connector.

1. Decide if you need to shade the original connector

Check the content of the Kafka connector's jar file. If it includes third-party dependencies, you may need to "shade" it (rename some classes).

To do so, copy shaded-dependencies/template-shaded/ to shaded-dependencies/<connector name> and add the new module into shaded-dependencies/pom.xml.

Ensure that third-party dependencies are renamed as specified in shaded-dependencies/<connector name>/pom.xmland build (mvn clean install).

2. Add new subproject

  1. Copy pulsar-connectors/template/ to pulsar-connectors/<connector name>/
  2. Add the new module into pulsar-connectors/pom.xml
  3. Update connector's name and description in pulsar-connectors/<connector name>/src/main/resources/META-INF/services/pulsar-io.yaml
  4. Update the pulsar-connectors/<connector name>/README.md
  5. Update the root README.md
  6. Update LICENSE and NOTICE files
  7. Build (mvn clean install).
  8. Run mvn dependency:tree -Dverbose to review how Maven auto-resolved potential dependency conflicts and fix as needed

To check the connector for CVEs:

mvn clean install verify -Powasp-dependency-check -DskipTests -f pulsar-connectors/<connector dir>/pom.xml

Detailed report will be at pulsar-connectors/<connector dir>/target/dependency-check-report.html