Streaming Jupyter Integrations project includes a set of magics for interactively running Flink SQL jobs in Jupyter Notebooks
In order to actually use these magics, you must install our PIP package along jupyterlab-lsp
:
python3 -m pip install jupyterlab-lsp streaming-jupyter-integrations
Register in Jupyter with a running IPython in the first cell:
%load_ext streaming_jupyter_integrations.magics
Then you need to decide which execution mode and execution target to choose.
%flink_connect --execution-mode [mode] --execution-target [target]
By default, the streaming
execution mode and local
execution target are used.
%flink_connect
Currently, Flink supports two execution modes: batch and streaming. Please see Flink documentation for more details.
In order to specify execution mode, add --execution-mode
parameter, for instance:
%flink_connect --execution-mode batch
Streaming Jupyter Integrations supports 3 execution targets:
- Local
- Remote
- YARN Session
Running Flink in local
mode will start a MiniCluster in a local JVM with parallelism 1.
In order to run Flink locally, use:
%flink_connect --execution-target local
Alternatively, since the execution target is local
by default, use:
%flink_connect
One can specify port of the local JobManager (8099 by default). This is useful especially if you run multiple Notebooks in a single JupyterLab.
%flink_connect --execution-target local --local-port 8123
Running Flink in remote mode will connect to an existing Flink session cluster. Besides specifying --execution-target
to be remote
, you also need to specify --remote-hostname
and --remote-port
pointing to Flink Job Manager's
REST API address.
%flink_connect \
--execution-target remote \
--remote-hostname example.com \
--remote-port 8888
Running Flink in yarn-session
mode will connect to an existing Flink session cluster running on YARN. You may specify
the hostname and port of the YARN Resource Manager (--resource-manager-hostname
and --resource-manager-port
).
If Resource Manager address is not provided, it is assumed that notebook runs on the same node as Resource Manager.
You can also specify YARN applicationId (--yarn-application-id
) to which the notebook will connect to.
If --yarn-application-id
is not specified and there is one YARN application running on the cluster, the notebook will
try to connect to it. Otherwise, it will fail.
Connecting to a remote Flink session cluster running on a remote YARN cluster:
%flink_connect \
--execution-target yarn-session \
--resource-manager-hostname example.com \
--resource-manager-port 8888 \
--yarn-application-id application_1666172784500_0001
Connecting to a Flink session cluster running on a YARN cluster:
%flink_connect \
--execution-target yarn-session \
--yarn-application-id application_1666172784500_0001
Connecting to a Flink session cluster running on a dedicated YARN cluster:
%flink_connect --execution-target yarn-session
Magics allow for dynamic variable substitution in Flink SQL cells.
my_variable = 1
SELECT * FROM some_table WHERE product_id = {my_variable}
Moreover, you can mark sensitive variables like password so they will be read from environment variables or user input every time one runs the cell:
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users',
'username' = '${my_username}',
'password' = '${my_password}'
);
The command allows to use Python DataStream API and Table API. There are two handles exposed for each API:
stream_env
and table_env
, respectively.
Table API example:
%%flink_execute
query = """
SELECT user_id, COUNT(*)
FROM orders
GROUP BY user_id
"""
execution_output = table_env.execute_sql(query)
When Table API is used, the final result has to be assigned to execution_output
variable.
DataStream API example:
%%flink_execute
from pyflink.common.typeinfo import Types
execution_output = stream_env.from_collection(
collection=[(1, 'aaa'), (2, 'bb'), (3, 'cccc')],
type_info=Types.ROW([Types.INT(), Types.STRING()])
)
When DataStream API is used, the final result has to be assigned to execution_output
variable. Please note that
the pipeline does not end with .execute()
, the execution is triggered by the Jupyter magics under the hood.
There are currently 2 options for running streaming_jupyter_integrations
for development. We can either
use a Docker image or install it on our machine.
You can build a Docker
image of Jupyter Notebooks
by running the command below.
It will contain functionality that was developed in this project.
docker build --tag streaming_jupyter_integrations_image .
After the image is built, we can run it using this command.
docker run --name streaming_jupyter_integrations -p 8888:8888 streaming_jupyter_integrations_image
After that we should be able to reach our Jupyterhub running on Docker under: http://127.0.0.1:8888/
Note: You will need NodeJS to build the extension package.
The jlpm
command is JupyterLab's pinned version of
yarn that is installed with JupyterLab. You may use
yarn
or npm
in lieu of jlpm
below. In order to use jlpm
, you have to
have jupyterlab
installed (e.g., by brew install jupyterlab
, if you use
Homebrew as your package manager).
# Clone the repo to your local environment
# Change directory to the flink_sql_lsp_extension directory
# Install package in development mode
pip install -e .
# Link your development version of the extension with JupyterLab
jupyter labextension develop . --overwrite
# Rebuild extension Typescript source after making changes
jlpm build
The project uses pre-commit hooks to ensure code quality, mostly by linting. To use it, install pre-commit and then run
pre-commit install --install-hooks
From that moment, it will lint the files you have modified on every commit attempt.