Get Started with Flink SQL in Confluent Manager for Apache Flink¶
This guide provides a quick introduction to using Flink SQL with Confluent Manager for Apache Flink® (CMF). It provides the prerequisites, how to set up an environment and compute pool, and how to run SQL statements.
This guide provides steps using the Confluent CLI, but you can also use the REST APIs for Confluent Manager for Apache Flink to perform the same tasks.
Prerequisites¶
- CMF installed. Follow Get Started with Confluent Platform for Apache Flink up to Step 3, installation of the Flink Kubernetes operator to install Confluent Manager for Apache Flink.
- The latest version of the Confluent CLI installed. For more information, see Install the Confluent CLI.
- Example tables configured. Run the following command to configure example tables that are immediately usable.
helm upgrade --install cmf --version "~2.0.0" \
confluentinc/confluent-manager-for-apache-flink \
--namespace default \
--set cmf.sql.examples-catalog.enabled=true \
--set cmf.sql.production=false
Set up an environment and a Compute Pool¶
This topic uses the Confluent CLI, but all functionality is also supported through the REST APIs.
Create Flink environment pointing to the default namespace.
confluent flink environment create test --kubernetes-namespace default
Create a compute pool to run the SQL statement with.
{ "apiVersion": "cmf.confluent.io/v1", "kind": "ComputePool", "metadata": { "name": "pool" }, "spec": { "type": "DEDICATED", "clusterSpec": { "flinkVersion": "v1_19", "image": "confluentinc/cp-flink-sql:1.19-cp1", "flinkConfiguration": { "pipeline.operator-chaining.enabled": "false", "execution.checkpointing.interval": "10s" }, "taskManager": { "resource": { "cpu": 1.0, "memory": "1024m" } }, "jobManager": { "resource": { "cpu": 0.5, "memory": "1024m" } } } } }
confluent --environment test flink compute-pool create /path/to/compute-pool.json
Run Statements¶
Use the following command to list tables of the example marketplace database.
confluent --environment test flink statement create stmt-ddl \
--catalog examples --database marketplace --compute-pool pool --output json \--sql "SHOW TABLES;"
The response will include the result of the statement, namely a list of three tables: blackhole, clicks, and orders. The clicks and the orders tables provide randomly generated data. The blackhole table consumes data and does not persist it.
Run the following statement to convert some data.
confluent --environment test flink statement create stmt-insert \
--catalog examples --database marketplace --compute-pool pool --output json \--sql "INSERT INTO blackhole SELECT url FROM clicks WHERE (user_id % 4) = 1;"
CMF executes this query by starting a Flink cluster using the specs of the ComputePool pool, and deploys the query on that cluster. This will take a few moments. You can check the progress with the following command:
confluent --environment test flink statement describe stmt-insert
After the query moves to RUNNING status, you can inspect it using Flink webUI.
confluent --environment test flink statement web-ui-forward stmt-insert --port 9090
The Confluent CLI forwards the Flink webUI through CMF to your local machine. Open http://localhost:9090 in your browser to see the INSERT INTO statement running in the Flink webUI.