In order to test both the Delta Lake source and sink connector, we design the demo as the following data flow.
- We use Spark to write rows into the Delta Lake table
- Deploy Delta Source Connector, monitor the change log of the Delta Lake table, and write the change log into the Pulsar topic
- Deploy Delta Sink Connector, consume messages from Pulsar topic, and write them into a new Delta Lake table.
- Use Spark to read the new Delta Lake table, and get all the rows out.
- Run Pulsar 2.9.1 in standalone mode with Pulsar functions enable.
bin/pulsar-daemon start standalone
-
Get the pulsar-io-lakehouse release package, and select version
v2.9.2.22
, named: pulsar-io-lakehouse-2.9.2.22.nar -
Configure Delta Lake Sink connector and submit into Pulsar functions system. Delta Lake Sink configuration,
config.json
{
"tenant": "public",
"namespace": "default",
"name": "delta_sink",
"parallelism": 1,
"inputs": [
"test_delta_pulsar"
],
"archive": "/tmp/lakehouse_connector_test/lake_house_connector/pulsar-io-lakehouse-2.9.2.22.nar",
"sourceSubscriptionName": "sandbox_sink_v1",
"processingGuarantees": "EFFECTIVELY_ONCE",
"configs": {
"type": "delta",
"tablePath": "file:///tmp/lakehouse_connector_test/lake_house_connector/data/test_sink_v1",
"maxCommitInterval": 60,
"maxRecordsPerCommit": 1_000_000
}
}
Use the following command to submit the sink connector to the Pulsar function system.
/apache-pulsar-2.9.1/bin/pulsar-admin sinks create \
--sink-config-file /tmp/lakehouse_connector_test/lake_house_connector/delta_sink/config.json \
Use the following command to get the list of sink connectors
bin/pulsar-admin sinks list
Use the following command to check the status of the sink connector
bin/pulsar-admin sinks status --name delta_sink
- Configure Delta Lake Source connector and submit into Pulsar functions system.
Delta Lake Source configuration,
config.json
{
"tenant": "public",
"namespace": "default",
"name": "delta_source",
"topicName": "test_delta_pulsar",
"parallelism": 1,
"processingGuarantees": "EFFECTIVELY_ONCE",
"archive": "/tmp/lakehouse_connector_test/lake_house_connector/pulsar-io-lakehouse-2.9.2.22.nar",
"configs": {
"type": "delta",
"fetchHistoryData": "true",
"checkpointInterval": 30,
"tablePath": "/tmp/lakehouse_connector_test/lake_house_connector/data/test_source_v1",
"maxReadBytesSizeOneRound": 4194304,
"maxReadRowCountOneRound": 10000
}
}
Use the following command to submit the connector to the Pulsar functions system.
/apache-pulsar-2.9.1/bin/pulsar-admin sources create \
--source-config-file /tmp/lakehouse_connector_test/lake_house_connector/delta_source/config.json
Use the following command to get the list of source connectors
bin/pulsar-admin sources list
Use the following command to check the status of the source connector
bin/pulsar-admin source status --name delta_source
Due to the Delta Lake table not being created yet, the delta source connector will keep restarting until the Delta Lake table is created.
- Get the Spark binary package(we use
spark-3.2.1-bin-hadoop3.2
), untar it, and use the following command to start it.
bin/spark-shell --packages io.delta:delta-core_2.12:1.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
- Use the following command to write data into the Delta Lake table.
> val sourcePath = "/tmp/lakehouse_connector_test/lake_house_connector/data/test_source_v1"
> for (i <- 0 to 1) {
spark.range(i * 100, (i + 1) * 100).map(x => (x, x % 5, s"foo-${x % 2}")).toDF("c1", "c2", "c3").write.mode("append").format("delta").save(sourcePath)
}
- Wait 1 minute, and use Spark to query the sink target table. Refer to: https://docs.delta.io/latest/quick-start.html#read-data
> val sinkPath = "/tmp/lakehouse_connector_test/lake_house_connector/data/test_sink_v1"
> val df = spark.read.format("delta").load(sinkPath)
> df.sort("c1").show(1000)
You can go through this video to get the details step by step.
- Run Pulsar 2.9.1 in standalone mode with Pulsar functions enable.
bin/pulsar-daemon start standalone
-
Get the pulsar-io-lakehouse release package, and select version
v2.9.2.22
, named: pulsar-io-lakehouse-2.9.2.22-cloud.nar -
Configure Delta Lake Sink connector and submit into Pulsar functions system. Delta Lake Sink configuration,
config.json
{
"tenant": "public",
"namespace": "default",
"name": "delta_sink",
"parallelism": 1,
"inputs": [
"test_delta_pulsar"
],
"archive": "/tmp/lakehouse_connector_test/lake_house_connector/pulsar-io-lakehouse-2.9.2.22-cloud.nar",
"sourceSubscriptionName": "sandbox_sink_v1",
"processingGuarantees": "EFFECTIVELY_ONCE",
"configs": {
"type": "delta",
"maxCommitInterval": 60,
"maxRecordsPerCommit": 1_000_000,
"tablePath": "s3a://test-dev-us-west-2/lakehouse/delta_sink_v1",
"hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
}
}
Use the following command to submit the sink connector to the Pulsar function system.
/tmp/lakehouse_connector_test/apache-pulsar-2.9.1/bin/pulsar-admin sinks create \
--sink-config-file /tmp/lakehouse_connector_test/lake_house_connector/delta_sink/config.json \
Use the following command to get the list of sink connectors
bin/pulsar-admin sinks list
Use the following command to check the status of the sink connector
bin/pulsar-admin sinks status --name delta_sink
- Configure Delta Lake Source connector and submit into Pulsar functions system.
Delta Lake Source configuration,
config.json
{
"tenant": "public",
"namespace": "default",
"name": "delta_source",
"topicName": "test_delta_pulsar",
"parallelism": 1,
"processingGuarantees": "EFFECTIVELY_ONCE",
"archive": "/tmp/lakehouse_connector_test/lake_house_connector/pulsar-io-lakehouse-2.9.2.22.nar",
"configs": {
"type": "delta",
"fetchHistoryData": "true",
"checkpointInterval": 30,
"maxReadBytesSizeOneRound": 4194304,
"maxReadRowCountOneRound": 10000,
"tablePath": "s3a://test-dev-us-west-2/lakehouse/delta_source_v1",
"hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
}
}
Use the following command to submit the connector to the Pulsar functions system.
/tmp/lakehouse_connector_test/apache-pulsar-2.9.1/bin/pulsar-admin sources create \
--source-config-file /tmp/lakehouse_connector_test/lake_house_connector/delta_source/config.json
Use the following command to get the list of source connectors
bin/pulsar-admin sources list
Use the following command to check the status of the source connector
bin/pulsar-admin source status --name delta_source
Due to the Delta Lake table not being created yet, the delta source connector will keep restarting until the Delta Lake table is created.
- Get the Spark binary package(we use
spark-3.2.1-bin-hadoop3.2
), untar it, and use the following command to start it.
bin/spark-shell \
--packages io.delta:delta-core_2.12:1.2.1,org.apache.hadoop:hadoop-aws:3.3.1 \
--conf spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY_ID \
--conf spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_KEY
- Use the following command to write data into the Delta Lake table.
> val sourcePath = "s3a://test-dev-us-west-2/lakehouse/delta_source_v1"
> for (i <- 0 to 1) {
spark.range(i * 100, (i + 1) * 100).map(x => (x, x % 5, s"foo-${x % 2}")).toDF("c1", "c2", "c3").write.mode("append").format("delta").save(sourcePath)
}
- Wait 1 minute, and use Spark to query the sink target table. Refer: https://docs.delta.io/latest/quick-start.html#read-data
> val sinkPath = "s3a://test-dev-us-west-2/lakehouse/delta_sink_v1"
> val df = spark.read.format("delta").load(sinkPath)
> df.sort("c1").show(1000)
You can go through this video to get the details step by step.