Skip to content

Commit

Permalink
Modified kafka notebook (#128)
Browse files Browse the repository at this point in the history
Co-authored-by: chetan thote <chetan@chetans-MacBook-Pro.local>
  • Loading branch information
chetanthote and chetan thote authored Dec 10, 2024
1 parent ab3d0bf commit 500585e
Showing 1 changed file with 55 additions and 47 deletions.
102 changes: 55 additions & 47 deletions notebooks/load-kafka-template/notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
]
},
{
"id": "1bee474b",
"id": "17e0a577",
"cell_type": "markdown",
"metadata": {},
"source": [
Expand All @@ -33,6 +33,7 @@
{
"attachments": {},
"cell_type": "markdown",
"id": "46a1d738",
"metadata": {},
"source": [
"<div class=\"alert alert-block alert-warning\">\n",
Expand All @@ -42,12 +43,12 @@
" <p>Define the <b>BOOTSTRAP_SERVER</b>, <b>PORT</b>, <b>TOPIC</b>,<b>SASL_USERNAME</b>,<b>SASL_MECHANISM</b>,<b>SECURITY_PROTOCOL</b>, and <b>SASL_PASSWORD</b> variables below for integration, replacing the placeholder values with your own.</p>\n",
" </div>\n",
"</div>"
],
"id": "46a1d738"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "88b1ac9d",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -58,50 +59,50 @@
"SASL_MECHANISM = 'sasl-mechanism'\n",
"SECURITY_PROTOCOL = 'security-proptocol'\n",
"SASL_PASSWORD = 'password'"
],
"id": "88b1ac9d"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "64fdd646",
"metadata": {},
"source": [
"This notebook demonstrates how to create a sample table in SingleStore, set up a pipeline to import data from an Kafka topic, and run queries on the imported data. It is designed for users who want to integrate Kafka data with SingleStore and explore the capabilities of pipelines for efficient data ingestion."
"This notebook demonstrates how to create a sample table in SingleStore, set up a pipeline to import data from Kafka Topic, and run queries on the imported data. It is designed for users who want to integrate Kafka data with SingleStore and explore the capabilities of pipelines for efficient data ingestion."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c35b30d7",
"metadata": {},
"source": [
"<h3>Pipeline Flow Illustration</h3>"
],
"id": "c35b30d7"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "979e53c2",
"metadata": {},
"source": [
"<img src=https://singlestoreloaddata.s3.ap-south-1.amazonaws.com/images/LoadDataKafka.png width=\"100%\" hight=\"50%\"/>"
],
"id": "979e53c2"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9f9d6aa2",
"metadata": {},
"source": [
"## Creating Table in SingleStore\n",
"\n",
"Start by creating a table that will hold the data imported from Kafka."
],
"id": "9f9d6aa2"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "82a48dd0",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -115,12 +116,12 @@
" address TEXT,\n",
" created_at TIMESTAMP\n",
");"
],
"id": "82a48dd0"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "90ad124f",
"metadata": {},
"source": [
"## Create a Pipeline to Import Data from Kafka\n",
Expand All @@ -131,21 +132,21 @@
"You have access to the Kafka topic.\n",
"Proper IAM roles or access keys are configured in SingleStore.\n",
"The JSON message has a structure that matches the table schema.</i>"
],
"id": "90ad124f"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "6e401f87",
"metadata": {},
"source": [
"Using these identifiers and keys, execute the following statement."
],
"id": "6e401f87"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "0b4f42d6",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -162,137 +163,144 @@
"}'\n",
"INTO TABLE my_table\n",
"FORMAT JSON ;"
],
"id": "0b4f42d6"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1d137801",
"metadata": {},
"source": [
"## Start the Pipeline\n",
"\n",
"To start the pipeline and begin importing the data from the S3 bucket:"
],
"id": "1d137801"
"To start the pipeline and begin importing the data from the Kafka topic:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e94cff73",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"START PIPELINE kafka_import_pipeline;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can see status of your pipeline <a href='https://portal.singlestore.com/organizations/org-id/pipelines'> Click Here</a>"
],
"id": "e94cff73"
"id": "03f4cbfb"
},
{
"attachments": {},
"cell_type": "markdown",
"id": "094b857c",
"metadata": {},
"source": [
"## Select Data from the Table\n",
"\n",
"Once the data has been imported, you can run a query to select it:"
],
"id": "094b857c"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "bc0f7b0c",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"SELECT * FROM my_table LIMIT 10;"
],
"id": "bc0f7b0c"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "669dac71",
"metadata": {},
"source": [
"### Check if all data of the data is loaded"
],
"id": "669dac71"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "a47c2f0f",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"SELECT count(*) FROM my_table"
],
"id": "a47c2f0f"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "91eae728",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"We have shown how to insert data from a Amazon S3 using `Pipelines` to SingleStoreDB. These techniques should enable you to\n",
"integrate your Amazon S3 with SingleStoreDB."
],
"id": "91eae728"
"We have shown how to insert data from a Kafka topic using `Pipelines` to SingleStoreDB. These techniques should enable you to\n",
"integrate your Kafka topic with SingleStoreDB."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "6dc86514",
"metadata": {},
"source": [
"## Clean up\n",
"\n",
"Remove the '#' to uncomment and execute the queries below to clean up the pipeline and table created."
],
"id": "6dc86514"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "706ccd4c",
"metadata": {},
"source": [
"#### Drop Pipeline"
],
"id": "706ccd4c"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "ed7dc33a",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"#STOP PIPELINE kafka_import_pipeline;\n",
"\n",
"#DROP PIPELINE kafka_import_pipeline;"
],
"id": "ed7dc33a"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "b5e15411",
"metadata": {},
"source": [
"#### Drop Data"
],
"id": "b5e15411"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "f8f3d6ef",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"#DROP TABLE my_table;"
],
"id": "f8f3d6ef"
]
},
{
"id": "12d50a52",
Expand Down Expand Up @@ -326,7 +334,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down

0 comments on commit 500585e

Please sign in to comment.