documentation
Get Started Free
  • Get Started Free
  • Stream
      Confluent Cloud

      Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

      Confluent Platform

      An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

  • Connect
      Managed

      Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

      Self-Managed

      Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

  • Govern
      Managed

      Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

      Self-Managed

      Use self-managed Schema Registry and Stream Governance with Confluent Platform.

  • Process
      Managed

      Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

      Self-Managed

      Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Stream
Confluent Cloud

Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

Confluent Platform

An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

Connect
Managed

Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

Self-Managed

Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

Govern
Managed

Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

Self-Managed

Use self-managed Schema Registry and Stream Governance with Confluent Platform.

Process
Managed

Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

Self-Managed

Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Learn
Get Started Free
  1. Home
  2. Cloud
  3. Process Data with Confluent Cloud for Apache Flink
  4. Flink SQL and Table API Reference for Confluent Cloud for Apache Flink

CLOUD

  • Overview
  • Get Started
    • Overview
    • Quick Start
    • REST API Quick Start
    • Manage Schemas
    • Deploy Free Clusters
    • Tutorials and Examples
      • Overview
      • Example: Use Replicator to Copy Kafka Data to Cloud
      • Example: Create Fully-Managed Services
      • Example: Build an ETL Pipeline
  • Manage Kafka Clusters
    • Overview
    • Cluster Types
    • Manage Configuration Settings
    • Cloud Providers and Regions
    • Resilience
    • Copy Data with Cluster Linking
      • Overview
      • Quick Start
      • Use Cases and Tutorials
        • Share Data Across Clusters, Regions, and Clouds
        • Disaster Recovery and Failover
        • Create Hybrid Cloud and Bridge-to-Cloud Deployments
        • Use Tiered Separation of Critical Workloads
        • Migrate Data
        • Manage Audit Logs
      • Configure, Manage, and Monitor
        • Configure and Manage Cluster Links
        • Manage Mirror Topics
        • Manage Private Networking
        • Manage Security
        • Monitor Metrics
      • FAQ
      • Troubleshooting
    • Copy Data with Replicator
      • Quick Start
      • Use Replicator to Migrate Topics
    • Resize a Dedicated Cluster
    • Multi-Tenancy and Client Quotas for Dedicated Clusters
      • Overview
      • Quick Start
    • Create Cluster Using Terraform
    • Create Cluster Using Pulumi
    • Connect Confluent Platform and Cloud Environments
      • Overview
      • Connect Self-Managed Control Center to Cloud
      • Connect Self-Managed Clients to Cloud
      • Connect Self-Managed Connect to Cloud
      • Connect Self-Managed REST Proxy to Cloud
      • Connect Self-Managed ksqlDB to Cloud
      • Connect Self-Managed MQTT to Cloud
      • Connect Self-Managed Schema Registry to Cloud
      • Connect Self-Managed Streams to Cloud
      • Example: Autogenerate Self-Managed Component Configs for Cloud
  • Build Client Applications
    • Overview
    • Client Quick Start
    • Configure Clients
      • Architectural Considerations
      • Consumer
      • Producer
      • Configuration Properties
      • Connect Program
    • Test and Monitor a Client
      • Test
      • Monitor
      • Reset Offsets
    • Optimize and Tune
      • Overview
      • Configuration Settings
      • Throughput
      • Latency
      • Durability
      • Availability
      • Freight
    • Client Guides
      • Python
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java Client
      • JMS Client
        • Overview
        • Development Guide
    • Kafka Client APIs
      • Python Client API
      • .NET Client API
      • JavaScript Client API
      • Go Client API
      • C++ Client API
      • Java Client API
      • JMS Client
        • Overview
        • Development Guide
    • Deprecated Client APIs
    • Client Examples
      • Overview
      • Python Client
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java
      • Spring Boot
      • KafkaProducer
      • REST
      • Clojure
      • Groovy
      • Kafka Connect Datagen
      • kafkacat
      • Kotlin
      • Ruby
      • Rust
      • Scala
    • VS Code Extension
  • Build Kafka Streams Applications
    • Overview
    • Quick Start
    • ksqlDB
      • Create Stream Processing Apps with ksqlDB
      • Quick Start
      • Enable ksqlDB Integration with Schema Registry
      • ksqlDB Cluster API Quick Start
      • Monitor ksqlDB
      • Manage ksqlDB by using the CLI
      • Manage Connectors With ksqlDB
      • Develop ksqlDB Applications
      • Pull Queries
      • Grant Role-Based Access
      • Migrate ksqlDB Applications on Confluent Cloud
  • Manage Security
    • Overview
    • Manage Authentication
      • Overview
      • Manage User Identities
        • Overview
        • Manage User Accounts
          • Overview
          • Authentication Security Protections
          • Manage Local User Accounts
          • Multi-factor Authentication
          • Manage SSO User Accounts
        • Manage User Identity Providers
          • Overview
          • Use Single Sign-On (SSO)
          • Manage SAML Single Sign-On (SSO)
          • Manage Azure Marketplace SSO
          • Just-in-time User Provisioning
          • Group Mapping
            • Overview
            • Enable Group Mapping
            • Manage Group Mappings
            • Troubleshooting
            • Best Practices
          • Manage Trusted Domains
          • Manage SSO provider
          • Troubleshoot SSO
      • Manage Workload Identities
        • Overview
        • Manage Workload Identities
        • Manage Service Accounts and API Keys
          • Overview
          • Manage Service Accounts
          • Manage API Keys
            • Overview
            • Manage API keys
            • Best Practices
            • Troubleshoot
        • Manage OAuth/OIDC Identity Providers
          • Overview
          • Add an OIDC identity provider
          • Use OAuth identity pools and filters
          • Manage identity provider configurations
          • Manage the JWKS URI
          • Configure OAuth clients
          • Access Kafka REST APIs
          • Use Confluent STS tokens with REST APIs
          • Best Practices
        • Manage mTLS Identity Providers
          • Overview
          • Configure mTLS
          • Manage Certificate Authorities
          • Manage Certificate Identity Pools
          • Create CEL Filters for mTLS
          • Create JSON payloads for mTLS
          • Manage Certificate Revocation
          • Troubleshoot mTLS Issues
    • Control Access
      • Overview
      • Resource Hierarchy
        • Overview
        • Organizations
          • Overview
          • Manage Multiple Organizations
        • Environments
        • Confluent Resource Names (CRNs)
      • Manage Role-Based Access Control
        • Overview
        • Predefined RBAC Roles
        • Manage Role Bindings
        • Use ACLs with RBAC
      • Manage IP Filtering
        • Overview
        • Manage IP Groups
        • Manage IP Filters
        • Best Practices
      • Manage Access Control Lists
      • Use the Confluent CLI with multiple credentials on Confluent Cloud
    • Encrypt and Protect Data
      • Overview
      • Manage Data in Transit With TLS
      • Encrypt Data at Rest Using Self-managed Encryption Keys
        • Overview
        • Use Self-managed Encryption Keys on AWS
        • Use Self-managed Encryption Keys on Azure
        • Use Self-managed Encryption Keys on Google Cloud
        • Use Pre-BYOK-API-V1 Self-managed Encryption Keys
        • Use Confluent CLI for Self-managed Encryption Keys
        • Use BYOK API for Self-managed Encryption Keys
        • Revoke Access to Data at Rest
        • Best Practices
      • Encrypt Sensitive Data Using Client-side Field Level Encryption
        • Overview
        • Manage CSFLE using Confluent Cloud Console
        • Use Client-side Field Level Encryption
        • Configuration Settings
        • Manage Encryption Keys
        • Quick Start
        • Implement a Custom KMS Driver
        • Process Encrypted Data with Apache Flink
        • Code examples
        • Troubleshoot
        • FAQ
    • Monitor Activity
      • Concepts
      • Understand Audit Log Records
      • Audit Log Event Schema
      • Auditable Event Methods
        • Connector
        • Custom connector plugin
        • Flink
        • Flink Authentication and Authorization
        • IP Filter Authorization
        • Kafka Cluster Authentication and Authorization
        • Kafka Cluster Management
        • ksqlDB Cluster Authentication and Authorization
        • Networking
        • Notifications Service
        • OAuth/OIDC Identity Provider and Identity Pool
        • Organization
        • Role-based Access Control (RBAC)
        • Schema Registry Authentication and Authorization
        • Schema Registry Management and Operations
        • Tableflow Data Plane
        • Tableflow Control Plane
      • Access and Consume Audit Log Records
      • Retain Audit Logs
      • Best Practices
      • Troubleshoot
    • Access Management Tutorial
  • Manage Topics
    • Overview
    • Configuration Reference
    • Message Browser
    • Share Streams
      • Overview
      • Provide Stream Shares
      • Consume Stream Shares
    • Tableflow
      • Overview
      • Concepts
        • Overview
        • Storage
        • Schemas
        • Materialize Change Data Capture Streams
        • Billing
      • Get Started
        • Overview
        • Quick Start with Managed Storage
        • Quick Start Using Your Storage and AWS Glue
      • How-to Guides
        • Overview
        • Configure Storage
        • Integrate Catalogs
          • Overview
          • Integrate with AWS Glue Catalog
          • Integrate with Snowflake Open Catalog or Apache Polaris
        • Query Data
          • Overview
          • Query with AWS
          • Query with Snowflake
          • Query with Trino
      • Operate
        • Overview
        • Configure
        • Grant Role-Based Access
        • Monitor
        • Use Private Networking
        • Supported Cloud Regions
  • Govern Data Streams
    • Overview
    • Stream Governance
      • Manage Governance Packages
      • Data Portal
      • Track Data with Stream Lineage
      • Manage Stream Catalog
        • Stream Catalog User Guide
        • REST API Catalog Usage and Examples Guide
        • GraphQL API Catalog Usage and Examples Guide
    • Manage Schemas
      • Overview
      • Manage Schemas
      • Delete Schemas and Manage Storage
      • Use Broker-Side Schema ID Validation
      • Schema Linking
      • Schema Registry Tutorial
    • Fundamentals
      • Key Concepts
      • Schema Evolution and Compatibility
      • Schema Formats
        • Serializers and Deserializers Overview
        • Avro
        • Protobuf
        • JSON Schema
      • Data Contracts
      • Security Considerations
      • Enable Private Networking
        • Enable Private Networking with Schema Registry PrivateLink
        • Enable Private Networking for Schema Registry with a Public Endpoint
    • Reference
      • Configure Clients to Schema Registry
      • Schema Registry REST API Usage Examples
      • Use AsyncAPI to Describe Topics and Schemas
      • Maven Plugin
    • FAQ
  • Connect to External Systems
    • Overview
    • Install Connectors
      • ActiveMQ Source
      • AlloyDB Sink
      • Amazon CloudWatch Logs Source
      • Amazon CloudWatch Metrics Sink
      • Amazon DynamoDB CDC Source
      • Amazon DynamoDB Sink
      • Amazon Kinesis Source
      • Amazon Redshift Sink
      • Amazon S3 Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
      • Amazon S3 Source
      • Amazon SQS Source
      • AWS Lambda Sink
      • Azure Blob Storage Sink
        • Configure and Launch
        • Configure with Azure Egress Private Link Endpoints
      • Azure Blob Storage Source
      • Azure Cognitive Search Sink
      • Azure Cosmos DB Sink
      • Azure Cosmos DB Sink V2
      • Azure Cosmos DB Source
      • Azure Cosmos DB Source V2
      • Azure Data Lake Storage Gen2 Sink
      • Azure Event Hubs Source
      • Azure Functions Sink
      • Azure Log Analytics Sink
      • Azure Service Bus Source
      • Azure Synapse Analytics Sink
      • Databricks Delta Lake Sink
        • Set up Databricks Delta Lake (AWS) Sink Connector for Confluent Cloud
        • Configure and launch the connector
      • Datadog Metrics Sink
      • Datagen Source (development and testing)
      • Elasticsearch Service Sink
      • GitHub Source
      • Google BigQuery Sink [Deprecated]
      • Google BigQuery Sink V2
      • Google Cloud BigTable Sink
      • Google Cloud Dataproc Sink [Deprecated]
      • Google Cloud Functions Gen 2 Sink
      • Google Cloud Functions Sink [Deprecated]
      • Google Cloud Pub/Sub Source
      • Google Cloud Spanner Sink
      • Google Cloud Storage Sink
      • Google Cloud Storage Source
      • HTTP Sink
      • HTTP Sink V2
      • HTTP Source
      • HTTP Source V2
      • IBM MQ Source
      • InfluxDB 2 Sink
      • InfluxDB 2 Source
      • Jira Source
      • Microsoft SQL Server CDC Source (Debezium) [Deprecated]
      • Microsoft SQL Server CDC Source V2 (Debezium)
        • Configure and launch the connector
        • Backward incompatibility considerations
      • Microsoft SQL Server Sink (JDBC)
      • Microsoft SQL Server Source (JDBC)
      • MongoDB Atlas Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Egress Private Service Connect Endpoints
      • MongoDB Atlas Source
      • MQTT Sink
      • MQTT Source
      • MySQL CDC Source (Debezium) [Deprecated]
      • MySQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • MySQL Sink (JDBC)
      • MySQL Source (JDBC)
      • New Relic Metrics Sink
      • OpenSearch Sink
      • Oracle XStream CDC Source
        • Overview
        • Configure and Launch the connector
        • Oracle Database Prerequisites
        • Change Events
        • Examples
      • Oracle CDC Source
        • Overview
        • Configure and Launch the connector
        • Horizontal Scaling
        • Oracle Database Prerequisites
        • SMT Examples
        • DDL Changes
        • Troubleshooting
      • Oracle Database Sink (JDBC)
      • Oracle Database Source (JDBC)
      • PagerDuty Sink [Deprecated]
      • Pinecone Sink
      • PostgreSQL CDC Source (Debezium) [Deprecated]
      • PostgreSQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • PostgreSQL Sink (JDBC)
      • PostgreSQL Source (JDBC)
      • RabbitMQ Sink
      • RabbitMQ Source
      • Redis Sink
      • Salesforce Bulk API 2.0 Sink
      • Salesforce Bulk API 2.0 Source
      • Salesforce Bulk API Source
      • Salesforce CDC Source
      • Salesforce Platform Event Sink
      • Salesforce Platform Event Source
      • Salesforce PushTopic Source
      • Salesforce SObject Sink
      • ServiceNow Sink
      • ServiceNow Source [Legacy]
      • ServiceNow Source V2
      • SFTP Sink
      • SFTP Source
      • Snowflake Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Snowflake Source
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Solace Sink
      • Splunk Sink
      • Zendesk Source
    • Confluent Hub
      • Overview
      • Component Archive Specification
      • Contribute
    • Install Custom Plugins and Custom Connectors
      • Overview
      • Quick Start
      • Manage Custom Connectors
      • Limitations and Support
      • API and CLI
    • Manage Provider Integration
      • Quick Start
      • Provider Integration APIs
    • Manage CSFLE
    • Networking and DNS
      • Overview
      • AWS Egress PrivateLink Endpoints for First-Party Services
      • AWS Egress PrivateLink Endpoints for Self-Managed Services
      • AWS Egress PrivateLink Endpoints for Amazon RDS
      • Azure Egress Private Link Endpoints for First-Party Services
      • Azure Egress Private Link Endpoints for Self-Managed Services
      • Google Cloud Private Service Connect Endpoints for First-Party Services
    • Connect API Usage
    • Manage Public Egress IP Addresses
    • Sample Connector Output
    • Configure Single Message Transforms
    • View Connector Events
    • Interpret Connector Statuses
    • Manage Service Accounts
    • Configure RBAC
    • View Errors in the Dead Letter Queue
    • Connector Limits
    • Manage Offsets
    • Transforms List
      • Overview
      • Cast
      • Drop
      • DropHeaders
      • EventRouter
      • ExtractField
      • ExtractTopic
      • Filter (Kafka)
      • Filter (Confluent)
      • Flatten
      • GzipDecompress
      • HeaderFrom
      • HoistField
      • InsertField
      • InsertHeader
      • MaskField
      • MessageTimestampRouter
      • RegexRouter
      • ReplaceField (Kafka)
      • ReplaceField (Confluent)
      • SetSchemaMetadata
      • TimestampConverter
      • TimestampRouter
      • TombstoneHandler
      • TopicRegexRouter
      • ValueToKey
  • Process Data with Flink
    • Overview
    • Get Started
      • Overview
      • Quick Start with Cloud Console
      • Quick Start with SQL Shell in Confluent CLI
      • Quick Start with Java Table API
      • Quick Start with Python Table API
    • Concepts
      • Overview
      • Compute Pools
      • Autopilot
      • Statements
      • Determinism
      • Tables and Topics
      • Time and Watermarks
      • User-defined Functions
      • Delivery Guarantees and Latency
      • Schema and Statement Evolution
      • Private Networking
      • Comparison with Apache Flink
      • Billing
    • How-To Guides
      • Overview
      • Aggregate a Stream in a Tumbling Window
      • Combine Streams and Track Most Recent Records
      • Compare Current and Previous Values in a Stream
      • Convert the Serialization Format of a Topic
      • Create a UDF
      • Deduplicate Rows in a Table
      • Enable UDF Logging
      • Handle Multiple Event Types
      • Mask Fields in a Table
      • Scan and Summarize Tables
      • Process Schemaless Events
      • Resolve Common SQL Query Problems
      • Transform a Topic
      • View Time Series Data
    • Operate and Deploy
      • Overview
      • Manage Compute Pools
      • Monitor and Manage Statements
      • Grant Role-Based Access
      • Deploy a Statement with CI/CD
      • Generate a Flink API Key
      • REST API
      • Move SQL Statements to Production
      • Enable Private Networking
    • Flink Reference
      • Overview
      • SQL Syntax
      • DDL Statements
        • Statements Overview
        • ALTER MODEL
        • ALTER TABLE
        • ALTER VIEW
        • CREATE FUNCTION
        • CREATE MODEL
        • CREATE TABLE
        • CREATE VIEW
        • DESCRIBE
        • DROP MODEL
        • DROP TABLE
        • DROP VIEW
        • HINTS
        • EXPLAIN
        • RESET
        • SET
        • SHOW
        • USE CATALOG
        • USE database_name
      • DML Statements
        • Queries Overview
        • Deduplication
        • Group Aggregation
        • INSERT INTO FROM SELECT
        • INSERT VALUES
        • Joins
        • LIMIT
        • Pattern Recognition
        • ORDER BY
        • OVER Aggregation
        • SELECT
        • Set Logic
        • EXECUTE STATEMENT SET
        • Top-N
        • Window Aggregation
        • Window Deduplication
        • Window Join
        • Window Top-N
        • Window Table-Valued Function
        • WITH
      • Functions
        • Flink SQL Functions
        • Aggregate
        • Collections
        • Comparison
        • Conditional
        • Datetime
        • Hashing
        • JSON
        • AI Model Inference
        • Numeric
        • String
        • Table API
      • Data Types
      • Data Type Mappings
      • Time Zone
      • Keywords
      • Information Schema
      • Example Streams
      • Supported Cloud Regions
      • SQL Examples
      • Table API
      • CLI Reference
    • Get Help
  • Build AI with Flink
    • Overview
    • Run an AI Model
    • Create an Embedding
  • Manage Networking
    • Overview
    • Networking on AWS
      • Overview
      • Public Networking on AWS
      • Confluent Cloud Network on AWS
      • PrivateLink on AWS
        • Overview
        • Inbound PrivateLink for Dedicated Clusters
        • Inbound PrivateLink for Serverless Products
        • Outbound PrivateLink for Dedicated Clusters
        • Outbound PrivateLink for Serverless Products
      • VPC Peering on AWS
      • Transit Gateway on AWS
      • Private Network Interface on AWS
    • Networking on Azure
      • Overview
      • Public Networking on Azure
      • Confluent Cloud Network on Azure
      • Private Link on Azure
        • Overview
        • Inbound Private Link for Dedicated Clusters
        • Inbound Private Link for Serverless Products
        • Outbound Private Link for Dedicated Clusters
        • Outbound Private Link for Serverless Products
      • VNet Peering on Azure
    • Networking on Google Cloud
      • Overview
      • Public Networking on Google Cloud
      • Confluent Cloud Network on Google Cloud
      • Private Service Connect on Google Cloud
        • Overview
        • Inbound Private Service Connect for Dedicated Clusters
        • Inbound Private Service Connect for Serverless Products
        • Outbound Private Service Connect for Dedicated Clusters
      • VPC Peering on Google Cloud
    • Connectivity for Confluent Resources
      • Overview
      • Public Egress IP Address for Connectors and Cluster Linking
      • Cluster Linking using AWS PrivateLink
      • Follower Fetching using AWS VPC Peering
    • Use the Confluent Cloud Console with Private Networking
    • Test Connectivity
  • Log and Monitor
    • Metrics
    • Manage Notifications
    • Monitor Consumer Lag
    • Monitor Dedicated Clusters
      • Monitor Cluster Load
      • Manage Performance and Expansion
      • Track Usage by Team
    • Observability for Kafka Clients to Confluent Cloud
  • Manage Billing
    • Overview
    • Marketplace Consumption Metrics
    • Use AWS Pay As You Go
    • Use AWS Commits
    • Use Azure Pay As You Go
    • Use Azure Commits
    • Use Professional Services on Azure
    • Use Google Cloud Pay As You Go
    • Use Google Cloud Commits
    • Use Professional Services on Google Cloud
    • Marketplace Organization Suspension and Deactivation
  • Manage Service Quotas
    • Overview
    • Service Quotas
    • View Service Quotas using Confluent CLI
    • Service Quotas API
  • APIs
    • Confluent Cloud APIs
    • Kafka Admin and Produce REST APIs
    • Connect API
    • Client APIs
      • C++ Client API
      • Python Client API
      • Go Client API
      • .NET Client API
    • Provider Integration API
    • Flink REST API
    • Metrics API
    • Stream Catalog REST API Usage
    • GraphQL API
    • Service Quotas API
  • Confluent CLI
  • Release Notes & FAQ
    • Release Notes
    • FAQ
    • Upgrade Policy
    • Compliance
    • Generate a HAR file for Troubleshooting
    • Confluent AI Assistant
  • Support
  • Glossary

Flink SQL Examples in Confluent Cloud for Apache Flink¶

The following code examples show common Flink SQL use cases with Confluent Cloud for Apache Flink®.

  • CREATE TABLE
  • Inferred tables
  • ALTER TABLE
  • SELECT
  • Schema reference

CREATE TABLE examples¶

The following examples show how to create Flink tables with various options.

Minimal table¶

CREATE TABLE t_minimal (s STRING);
Properties
  • Append changelog mode.
  • No Schema Registry key.
  • Round robin distribution.
  • 6 Kafka partitions.
  • The $rowtime column and system watermark are added implicitly.

Table with a primary key¶

Syntax
CREATE TABLE t_pk (k INT PRIMARY KEY NOT ENFORCED, s STRING);
Properties
  • Upsert changelog mode.
  • The primary key defines an implicit DISTRIBUTED BY(k).
  • k is the Schema Registry key.
  • Hash distribution on k.
  • The table has 6 Kafka partitions.
  • k is declared as being unique, meaning no duplicate rows.
  • k must not contain NULLs, so an implicit NOT NULL is added.
  • The $rowtime column and system watermark are added implicitly.

Table with a primary key in append mode¶

Syntax
CREATE TABLE t_pk_append (k INT PRIMARY KEY NOT ENFORCED, s STRING)
  DISTRIBUTED INTO 4 BUCKETS
  WITH ('changelog.mode' = 'append');
Properties
  • Append changelog mode.
  • k is the Schema Registry key.
  • Hash distribution on k.
  • The table has 4 Kafka partitions.
  • k is declared as being unique, meaning no duplicate rows.
  • k must not contain NULLs, meaning implicit NOT NULL.
  • The $rowtime column and system watermark are added implicitly.

Table with hash distribution¶

Syntax
CREATE TABLE t_dist (k INT, s STRING) DISTRIBUTED BY (k) INTO 4 BUCKETS;
Properties
  • Append changelog mode.
  • k is the Schema Registry key.
  • Hash distribution on k.
  • The table has 4 Kafka partitions.
  • The $rowtime column and system watermark are added implicitly.

Complex table with all concepts combined¶

Syntax
CREATE TABLE t_complex (k1 INT, k2 INT, PRIMARY KEY (k1, k2) NOT ENFORCED, s STRING)
  COMMENT 'My complex table'
  DISTRIBUTED BY HASH(k1) INTO 4 BUCKETS
  WITH ('changelog.mode' = 'append');
Properties
  • Append changelog mode.
  • k1 is the Schema Registry key.
  • Hash distribution on k1.
  • k2 is treated as a value column and is stored in the value part of Schema Registry.
  • The table has 4 Kafka partitions.
  • k1 and k2 are declared as being unique, meaning no duplicates.
  • k and k2 must not contain NULLs, meaning implicit NOT NULL.
  • The $rowtime column and system watermark are added implicitly.
  • An additional comment is added.

Table with overlapping names in key/value of Schema Registry but disjoint data¶

Syntax
CREATE TABLE t_disjoint (from_key_k INT, k STRING)
  DISTRIBUTED BY (from_key_k)
  WITH ('key.fields-prefix' = 'from_key_');
Properties
  • Append changelog mode.
  • Hash distribution on from_key_k.
  • The key prefix from_key_ is defined and is stripped before storing the schema in Schema Registry.
    • Therefore, k is the Schema Registry key of type INT.
    • Also, k is the Schema Registry value of type STRING.
  • Both key and value store disjoint data, so they can have different data types

Create with overlapping names in key/value of Schema Registry but joint data¶

Syntax
CREATE TABLE t_joint (k INT, v STRING)
  DISTRIBUTED BY (k)
  WITH ('value.fields-include' = 'all');
Properties
  • Append changelog mode.
  • Hash distribution on k.
  • By default, the key is never included in the value in Schema Registry.
  • By setting 'value.fields-include' = 'all', the value contains the full table schema
    • Therefore, k is the Schema Registry key.
    • Also, k, v is the Schema Registry value.
  • The payload of k is stored twice in the Kafka message, because key and value store joint data and they have the same data type for k.

Table with metadata columns for writing a Kafka message timestamp¶

Syntax
CREATE TABLE t_metadata_write (name STRING, ts TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp')
  DISTRIBUTED INTO 1 BUCKETS;
Properties
  • Adds the ts metadata column, which isn’t part of Schema Registry but instead is a pure Flink concept.
  • In contrast with $rowtime, which is declared as a METADATA VIRTUAL column, ts is selected in a SELECT * statement and is writable.

The following examples show how to fill Kafka messages with an instant.

INSERT INTO t (ts, name) SELECT NOW(), 'Alice';
INSERT INTO t (ts, name) SELECT TO_TIMESTAMP_LTZ(0, 3), 'Bob';
SELECT $rowtime, * FROM t;

Currently, the Schema Registry subject compatibility mode must be FULL or FULL_TRANSITIVE. For more information, see Schema Evolution and Compatibility for Schema Registry on Confluent Cloud.

Table with string key and value in Schema Registry¶

Syntax
CREATE TABLE t_raw_string_key (key STRING, i INT)
  DISTRIBUTED BY (key)
  WITH ('key.format' = 'raw');
Properties
  • Schema Registry is filled with a value subject containing i.
  • The key columns are determined by the DISTRIBUTED BY clause.
  • By default, Avro in Schema Registry would be used for the key, but the WITH clause overrides this to the raw format.

Tables with cross-region schema sharing¶

  1. Create two Kafka clusters in different regions, for example, eu-west-1 and us-west-2.

  2. Create two Flink compute pools in different regions, for example, eu-west-1 and us-west-2.

  3. In the first region, run the following statement.

    CREATE TABLE t_shared_schema (key STRING, s STRING) DISTRIBUTED BY (key);
    
  4. In the second region, run the same statement.

    CREATE TABLE t_shared_schema (key STRING, s STRING) DISTRIBUTED BY (key);
    
Properties
  • Schema Registry is shared across regions.
  • The SQL metastore, Flink compute pools, and Kafka clusters are regional.
  • Both tables in either region share the Schema Registry subjects t_shared_schema-key and t_shared_schema-value.

Create with different changelog modes¶

There are three ways of storing events in a table’s log, this is, in the underlying Kafka topic.

append
  • Every insertion event is an immutable fact.
  • Every event is insert-only.
  • Events can be distributed in a round-robin fashion across workers/shards because they are unrelated.
upsert
  • Events are related using a primary key.
  • Every event is either an upsert or delete event for a primary key.
  • Events for the same primary key should land at the same worker/shard.
retract
  • Every upsert event is a fact that can be “undone”.

  • This means that every event is either an insertion or its retraction.

  • So, two events are related by all columns. In other words, the entire row is the key.

    For example, +I['Bob', 42] is related to -D['Bob', 42] and +U['Alice', 13] is related to -U['Alice', 13].

  • The retract mode is intermediate between the append and upsert modes.
  • The append and upsert modes are natural to existing Kafka consumers and producers.
  • Kafka compaction is a kind of upsert.

Start with a table created by the following statement.

CREATE TABLE t_changelog_modes (i BIGINT);
Properties
  • Confluent Cloud for Apache Flink always derives an appropriate changelog mode for the preceding declaration.
  • If there is no primary key, append is the safest option, because it prevents users from pushing updates into a topic accidentally, and it has the best support of downstream consumers.
-- works because the query is non-updating
INSERT INTO t_changelog_modes SELECT 1;

-- does not work because the query is updating, causing an error
INSERT INTO t_changelog_modes SELECT COUNT(*) FROM (VALUES (1), (2), (3));

If you need updates, and if downstream consumers support it, for example, when the consumer is another Flink job, you can set the changelog mode to retract.

ALTER TABLE t_changelog_modes SET ('changelog.mode' = 'retract');
Properties
  • The table starts accepting retractions during INSERT INTO.
  • Already existing records in the Kafka topic are treated as insertions.
  • Newly added records receive a changeflag (+I, +U, -U, -D) in the Kafka message header.

Going back to append mode is possible, but retractions (-U, -D) appear as insertions, and the Kafka header metadata column reveals the changeflag.

ALTER TABLE t_changelog_modes SET ('changelog.mode' = 'append');
ALTER TABLE t_changelog_modes ADD headers MAP<BYTES, BYTES> METADATA VIRTUAL;

-- Shows what is serialized internally
SELECT i, headers FROM t_changelog_modes;

Table with infinite retention time¶

CREATE TABLE t_infinite_retention (i INT) WITH ('kafka.retention.time' = '0');
Properties
  • By default, the retention time is 7 days, as in all other APIs.
  • Flink doesn’t support -1 for durations, so 0 means infinite retention time.
  • Durations in Flink support 2 day or 2 d syntax, so it doesn’t need to be in milliseconds.
  • If no unit is specified, the unit is milliseconds.
  • The following units are supported:
"d", "day", "h", "hour", "m", "min", "minute", "ms", "milli", "millisecond",
"µs", "micro", "microsecond", "ns", "nano", "nanosecond"

Inferred table examples¶

Inferred tables are tables that have not been created by using a CREATE TABLE statement, but instead are automatically detected from information about existing Kafka topics and Schema Registry entries.

You can use the ALTER TABLE statement to evolve schemas for inferred tables.

The following examples show output from the SHOW CREATE TABLE statement called on the resulting table.

No key or value in Schema Registry¶

For an inferred table with no registered key or value schemas, SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_raw` (
  `key` VARBINARY(2147483647),
  `val` VARBINARY(2147483647)
) DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'raw',
  'value.format' = 'raw'
  ...
)
Properties
  • Key and value formats are raw (binary format) with BYTES.

  • Following Kafka message semantics, both key and value support NULL as well, so the following code is valid:

    INSERT INTO t_raw (key, val) SELECT CAST(NULL AS BYTES), CAST(NULL AS BYTES);
    

No key and but record value in Schema Registry¶

For the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "i",
      "type": "int"
    },
    {
      "name": "s",
      "type": "string"
    }
  ]
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_raw_key` (
  `key` VARBINARY(2147483647),
  `i` INT NOT NULL,
  `s` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'raw',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • The key format is raw (binary format) with BYTES.

  • Following Kafka message semantics, the key supports NULL as well, so the following code is valid:

    INSERT INTO t_raw_key SELECT CAST(NULL AS BYTES), 12, 'Bob';
    

Atomic key and record value in Schema Registry¶

For the following key schema in Schema Registry:

"int"

And for the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "i",
      "type": "int"
    },
    {
      "name": "s",
      "type": "string"
    }
  ]
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_atomic_key` (
  `key` INT NOT NULL,
  `i` INT NOT NULL,
  `s` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'avro-registry',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • Schema Registry defines the column data type as INT NOT NULL.
  • The column name, key, is used as the default, because Schema Registry doesn’t provide a column name.

Overlapping names in key/value, no key in Schema Registry¶

For the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "i",
      "type": "int"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_raw_disjoint` (
  `key_key` VARBINARY(2147483647),
  `i` INT NOT NULL,
  `key` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`key_key`) INTO 1 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.fields-prefix' = 'key_',
  'key.format' = 'raw',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • The Schema Registry value schema defines columns i INT NOT NULL and key STRING.
  • The column name key BYTES is used as the default if no key is in Schema Registry.
  • Because key would collide with value schema column, the key_ prefix is added.

Record key and record value in Schema Registry¶

For the following key schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "uid",
      "type": "int"
    }
  ]
}

And for the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "zip_code",
      "type": "string"
    }
  ]
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_sr_disjoint` (
  `uid` INT NOT NULL,
  `name` VARCHAR(2147483647) NOT NULL,
  `zip_code` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`uid`) INTO 1 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • Schema Registry defines columns for both key and value.
  • The column names of key and value are disjoint sets and don’t overlap.

Record key and record value with overlap in Schema Registry¶

For the following key schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "uid",
      "type": "int"
    }
  ]
}

And for the following value schema in Schema Registry:

{
    "type": "record",
    "name": "TestRecord",
    "fields": [
      {
        "name": "uid",
        "type": "int"
      },{
        "name": "name",
        "type": "string"
      },
      {
        "name": "zip_code",
        "type": "string"
      }
    ]
  }

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_sr_joint` (
  `uid` INT NOT NULL,
  `name` VARCHAR(2147483647) NOT NULL,
  `zip_code` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY HASH(`uid`) INTO 1 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'value.fields-include' = 'all',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • Schema Registry defines columns for both key and value.
  • The column names of key and value overlap on uid.
  • 'value.fields-include' = 'all' is set to exclude the key, because it is fully contained in the value.
  • Detecting that key is fully contained in the value requires that both field name and data type match completely, including nullability, and all fields of the key are included in the value.

Union types in Schema Registry¶

For the following value schema in Schema Registry:

["int", "string"]

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_union` (
  `key` VARBINARY(2147483647),
  `int` INT,
  `string` VARCHAR(2147483647)
)
...

For the following value schema in Schema Registry:

[
  "string",
  {
    "type": "record",
    "name": "User",
    "fields": [
      {
        "name": "uid",
        "type": "int"
      },{
        "name": "name",
        "type": "string"
      }
    ]
  },
  {
    "type": "record",
    "name": "Address",
    "fields": [
      {
        "name": "zip_code",
        "type": "string"
      }
    ]
  }
]

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_union` (
  `key` VARBINARY(2147483647),
  `string` VARCHAR(2147483647),
  `User` ROW<`uid` INT NOT NULL, `name` VARCHAR(2147483647) NOT NULL>,
  `Address` ROW<`zip_code` VARCHAR(2147483647) NOT NULL>
)
...
Properties
  • NULL and NOT NULL are inferred depending on whether a union contains NULL.
  • Elements of a union are always NULL, because they need to be set to NULL when a different element is set.
  • If a record defines a namespace, the field is prefixed with it, for example, org.myorg.avro.User.

Multi-message protobuf schema in Schema Registry¶

For the following value schema in Schema Registry:

syntax = "proto3";

message Purchase {
   string item = 1;
   double amount = 2;
   string customer_id = 3;
}

message Pageview {
   string url = 1;
   bool is_special = 2;
   string customer_id = 3;
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t` (
  `key` VARBINARY(2147483647),
  `Purchase` ROW<
      `item` VARCHAR(2147483647) NOT NULL,
      `amount` DOUBLE NOT NULL,
      `customer_id` VARCHAR(2147483647) NOT NULL
   >,
  `Pageview` ROW<
      `url` VARCHAR(2147483647) NOT NULL,
      `is_special` BOOLEAN NOT NULL,
      `customer_id` VARCHAR(2147483647) NOT NULL
   >
)
...

For the following value schema in Schema Registry:

syntax = "proto3";

message Purchase {
   string item = 1;
   double amount = 2;
   string customer_id = 3;
   Pageview pageview = 4;
}

message Pageview {
   string url = 1;
   bool is_special = 2;
   string customer_id = 3;
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t` (
  `key` VARBINARY(2147483647),
  `Purchase` ROW<
      `item` VARCHAR(2147483647) NOT NULL,
      `amount` DOUBLE NOT NULL,
      `customer_id` VARCHAR(2147483647) NOT NULL,
      `pageview` ROW<
         `url` VARCHAR(2147483647) NOT NULL,
         `is_special` BOOLEAN NOT NULL,
         `customer_id` VARCHAR(2147483647) NOT NULL
      >
   >,
  `Pageview` ROW<
      `url` VARCHAR(2147483647) NOT NULL,
      `is_special` BOOLEAN NOT NULL,
      `customer_id` VARCHAR(2147483647) NOT NULL
   >
)
...

For the following value schema in Schema Registry:

syntax = "proto3";

message Purchase {
   string item = 1;
   double amount = 2;
   string customer_id = 3;
   Pageview pageview = 4;
   message Pageview {
      string url = 1;
      bool is_special = 2;
      string customer_id = 3;
   }
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t` (
  `key` VARBINARY(2147483647),
  `item` VARCHAR(2147483647) NOT NULL,
  `amount` DOUBLE NOT NULL,
  `customer_id` VARCHAR(2147483647) NOT NULL,
  `pageview` ROW<
      `url` VARCHAR(2147483647) NOT NULL,
      `is_special` BOOLEAN NOT NULL,
      `customer_id` VARCHAR(2147483647) NOT NULL
   >
)
...

ALTER TABLE examples¶

The following examples show frequently used scenarios for ALTER TABLE.

Define a watermark for perfectly ordered data¶

Flink guarantees that rows are always emitted before the watermark is generated. The following statements ensure that for perfectly ordered events, meaning events without time-skew, a watermark can be equal to the timestamp or 1 ms less than the timestamp.

CREATE TABLE t_perfect_watermark (i INT);

-- If multiple events can have the same timestamp.
ALTER TABLE t_perfect_watermark
  MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '0.001' SECOND;

-- If a single event can have the timestamp.
ALTER TABLE t_perfect_watermark
  MODIFY WATERMARK FOR $rowtime AS $rowtime;

Drop your custom watermark strategy¶

Remove the custom watermark strategy to restore the default watermark strategy.

  1. View the current table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+------------------------+----------+-------------------+
    | Column Name |       Data Type        | Nullable |      Extras       |
    +-------------+------------------------+----------+-------------------+
    | user        | BIGINT                 | NOT NULL | PRIMARY KEY       |
    | product     | STRING                 | NULL     |                   |
    | amount      | INT                    | NULL     |                   |
    | ts          | TIMESTAMP(3) *ROWTIME* | NULL     | WATERMARK AS `ts` |
    +-------------+------------------------+----------+-------------------+
    
  2. Remove the watermark strategy of the table.

    ALTER TABLE `orders` DROP WATERMARK;
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  3. Check the new table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    

Configure Debezium format for CDC data¶

Change regular format to Debezium format¶

For tables that have been inferred with regular formats but contain Debezium CDC (Change Data Capture) data:

-- Convert from regular Avro format to Debezium CDC format
-- and configure the appropriate Flink changelog interpretation mode:
-- * append:  Treats each record as an INSERT operation with no relationship between records
-- * retract: Handles paired operations (INSERT/UPDATE/DELETE) where changes to the same row
--            are represented as a retraction of the old value followed by an addition of the new value
-- * upsert: Groups all operations for the primary key (derived from the Kafka message key),
--           with each operation effectively merging with or replacing previous state
--           (INSERT creates, UPDATE modifies, DELETE removes)
ALTER TABLE customer_data SET (
  'value.format' = 'avro-debezium-registry',
  'changelog.mode' = 'retract'
);
-- Convert from regular JSON format to Debezium CDC format
-- and configure the appropriate Flink changelog interpretation mode:
-- * append:  Treats each record as an INSERT operation with no relationship between records
-- * retract: Handles paired operations (INSERT/UPDATE/DELETE) where changes to the same row
--            are represented as a retraction of the old value followed by an addition of the new value
-- * upsert: Groups all operations for the primary key (derived from the Kafka message key),
--           with each operation effectively merging with or replacing previous state
--           (INSERT creates, UPDATE modifies, DELETE removes)
ALTER TABLE customer_data_json SET (
  'value.format' = 'json-debezium-registry',
  'changelog.mode' = 'retract'
);
-- Convert from regular Protobuf format to Debezium CDC format
-- and configure the appropriate Flink changelog interpretation mode:
-- * append:  Treats each record as an INSERT operation with no relationship between records
-- * retract: Handles paired operations (INSERT/UPDATE/DELETE) where changes to the same row
--            are represented as a retraction of the old value followed by an addition of the new value
-- * upsert: Groups all operations for the primary key (derived from the Kafka message key),
--           with each operation effectively merging with or replacing previous state
--           (INSERT creates, UPDATE modifies, DELETE removes)
ALTER TABLE customer_data_proto SET (
  'value.format' = 'proto-debezium-registry',
  'changelog.mode' = 'retract'
);

Modify Changelog Processing Mode¶

For tables with any type of data that need a different processing mode for handling changes:

-- Change to append mode (default)
-- Best for event streams where each record is independent
ALTER TABLE customer_changes SET (
  'changelog.mode' = 'append'
);

-- Change to retract mode
-- Useful when changes to the same row are represented as paired operations
ALTER TABLE customer_changes SET (
  'changelog.mode' = 'retract'
);

-- Change upsert mode when working with primary keys
-- Best when tracking state changes using a primary key (derived from Kafka message key)
ALTER TABLE customer_changes SET (
  'changelog.mode' = 'upsert'
);

Read and/or write Kafka headers¶

-- Create example topic
CREATE TABLE t_headers (i INT);

-- For read-only (virtual)
ALTER TABLE t_headers ADD headers MAP<BYTES, BYTES> METADATA VIRTUAL;

-- For read and write (persisted). Column becomes mandatory in INSERT INTO.
ALTER TABLE t_headers MODIFY headers MAP<BYTES, BYTES> METADATA;

-- Use implicit casting (origin is always MAP<BYTES, BYTES>)
ALTER TABLE t_headers MODIFY headers MAP<STRING, STRING> METADATA;

-- Insert and read
INSERT INTO t_headers SELECT 42, MAP['k1', 'v1', 'k2', 'v2'];
SELECT * FROM t_headers;
Properties
  • The metadata key is headers. If you don’t want to name the column this way, use: other_name MAP<BYTES, BYTES> METADATA FROM 'headers' VIRTUAL.
  • Keys of headers must be unique. Multi-key headers are not supported.

Add headers as a metadata column¶

You can get the headers of a Kafka record as a map of raw bytes by adding a headers virtual metadata column.

  1. Run the following statement to add the Kafka partition as a metadata column:

    ALTER TABLE `orders` ADD (
      `headers` MAP<BYTES,BYTES> METADATA VIRTUAL);
    
  2. View the new schema.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+-------------------+----------+-------------------------+
    | Column Name |     Data Type     | Nullable |         Extras          |
    +-------------+-------------------+----------+-------------------------+
    | user        | BIGINT            | NOT NULL | PRIMARY KEY, BUCKET KEY |
    | product     | STRING            | NULL     |                         |
    | amount      | INT               | NULL     |                         |
    | ts          | TIMESTAMP(3)      | NULL     |                         |
    | headers     | MAP<BYTES, BYTES> | NULL     | METADATA VIRTUAL        |
    +-------------+-------------------+----------+-------------------------+
    

Read topic from specific offsets¶

-- Create example topic with 1 partition filled with values
CREATE TABLE t_specific_offsets (i INT) DISTRIBUTED INTO 1 BUCKETS;
INSERT INTO t_specific_offsets VALUES (1), (2), (3), (4), (5);

-- Returns 1, 2, 3, 4, 5
SELECT * FROM t_specific_offsets;

-- Changes the scan range
ALTER TABLE t_specific_offsets SET (
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:3'
);

-- Returns 4, 5
SELECT * FROM t_specific_offsets;
Properties
  • scan.startup.mode and scan.bounded.mode control which range in the changelog (Kafka topic) to read.
  • scan.startup.specific-offsets and scan.bounded.specific-offsets define offsets per partition.
  • In the example, only 1 partition is used. For multiple partitions, use the following syntax:
'scan.startup.specific-offsets' = 'partition:0,offset:3; partition:1,offset:42; partition:2,offset:0'

Debug “no output” and no watermark cases¶

The root cause for most “no output” cases is that a time-based operation, for example, TUMBLE, MATCH_RECOGNIZE, and FOR SYSTEM_TIME AS OF, did not receive recent enough watermarks.

The current time of an operator is calculated by the minimum watermark of all inputs, meaning across all tables/topics and their partitions.

If one partition does not emit a watermark, it can affect the entire pipeline.

The following statements may be helpful for debugging issues related to watermarks.

-- example table
CREATE TABLE t_watermark_debugging (k INT, s STRING)
  DISTRIBUTED BY (k) INTO 4 BUCKETS;

-- Each value lands in a separate Kafka partition (out of 4).
-- Leave out values to see missing watermarks.
INSERT INTO t_watermark_debugging
  VALUES (1, 'Bob'), (2, 'Alice'), (8, 'John'), (15, 'David');

-- If ROW_NUMBER doesn't show results, it's clearly a watermark issue.
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
  FROM t_watermark_debugging;

-- Add partition information as metadata column
ALTER TABLE t_watermark_debugging ADD part INT METADATA FROM 'partition' VIRTUAL;

-- Use the CURRENT_WATERMARK() function to check which watermark is calculated
SELECT
  *,
  part AS `Row Partition`,
  $rowtime AS `Row Timestamp`,
  CURRENT_WATERMARK($rowtime) AS `Operator Watermark`
FROM t_watermark_debugging;

-- Visualize the highest timestamp per Kafka partition
-- Due to the table declaration (with 4 buckets), this query should show 4 rows.
-- If not, the missing partitions might be the cause for watermark issues.
SELECT part AS `Partition`, MAX($rowtime) AS `Max Timestamp in Partition`
  FROM t_watermark_debugging
  GROUP BY part;

-- A workaround could be to not use the system watermark:
ALTER TABLE t_watermark_debugging
  MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '2' SECOND;
-- Or for perfect input data:
ALTER TABLE t_watermark_debugging
  MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '0.001' SECOND;

-- Add "fresh" data while the above statements with
-- ROW_NUMBER() or CURRENT_WATERMARK() are running.
INSERT INTO t_watermark_debugging VALUES
  (1, 'Fresh Bob'),
  (2, 'Fresh Alice'),
  (8, 'Fresh John'),
  (15, 'Fresh David');

The debugging examples above won’t solve everything but may help in finding the root cause.

The system watermark strategy is smart and excludes idle Kafka partitions from the watermark calculation after some time, but at least one partition must produce new data for the “logical clock” with watermarks.

Typically, root causes are:

  • Idle Kafka partitions
  • No data in Kafka partitions
  • Not enough data in Kafka partitions
  • Watermark strategy is too conservative
  • No fresh data after warm up with historical data for progressing the logical clock

Handle idle partitions for missing watermarks¶

Idle partitions often cause missing watermarks. Also, no data in a partition or infrequent data can be a root cause.

-- Create a topic with 4 partitions.
CREATE TABLE t_watermark_idle (k INT, s STRING)
  DISTRIBUTED BY (k) INTO 4 BUCKETS;

-- Avoid the "not enough data" problem by using a custom watermark.
-- The watermark strategy is still coarse-grained enough for this example.
ALTER TABLE t_watermark_idle
  MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '2' SECONDS;

-- Each value lands in a separate Kafka partition, and partition 1 is empty.
INSERT INTO t_watermark_idle
  VALUES
    (1, 'Bob in partition 0'),
    (2, 'Alice in partition 3'),
    (8, 'John in partition 2');

-- Thread 1: Start a streaming job.
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
  FROM t_watermark_idle;

-- Thread 2: Insert some data immediately -> Thread 1 still without results.
INSERT INTO t_watermark_idle
  VALUES (1, 'Another Bob in partition 0 shortly after');

-- Thread 2: Insert some data after 15s -> Thread 1 should show results.
INSERT INTO t_watermark_idle
  VALUES (1, 'Another Bob in partition 0 after 15s')

Within the first 15 seconds, all partitions contribute to the watermark calculation, so the first INSERT INTO has no effect because partition 1 is still empty.

After 15 seconds, all partitions are marked as idle. No partition contributes to the watermark calculation. But when the second INSERT INTO is executed, it becomes the main driving partition for the logical clock.

The global watermark jumps to “second INSERT INTO - 2 seconds”.

In the following code, the sql.tables.scan.idle-timeout configuration overrides the default idle-detection algorithm, so even an immediate INSERT INTO can be the main driving partition for the logical clock, because all other partitions are marked as idle after 1 second.

-- Thread 1: Start a streaming job.
-- Lower the idle timeout further.
SET 'sql.tables.scan.idle-timeout' = '1s';
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
  FROM t_watermark_idle;

-- Thread 2: Insert some data immediately -> Thread 1 should show results.
INSERT INTO t_watermark_idle
  VALUES (1, 'Another Bob in partition 0 shortly after');

Change the schema context property¶

You can set the schema context for key and value formats to control the namespace for your schema resolution in Schema Registry.

  1. Set the schema context for the value format

    ALTER TABLE `orders` SET ('value.format.schema-context' = '.lsrc-newcontext');
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  2. Check the new table properties.

    SHOW CREATE TABLE `orders`;
    

    Your output should resemble:

    +----------------------------------------------------------------------+
    |                          SHOW CREATE TABLE                           |
    +----------------------------------------------------------------------+
    | CREATE TABLE `catalog`.`database`.`orders` (                         |
    |   `user` BIGINT NOT NULL,                                            |
    |   `product` VARCHAR(2147483647),                                     |
    |   `amount` INT,                                                      |
    |   `ts` TIMESTAMP(3)                                                  |
    | )                                                                    |
    |   DISTRIBUTED BY HASH(`user`) INTO 6 BUCKETS                         |
    | WITH (                                                               |
    |   'changelog.mode' = 'upsert',                                       |
    |   'connector' = 'confluent',                                         |
    |   'kafka.cleanup-policy' = 'delete',                                 |
    |   'kafka.max-message-size' = '2097164 bytes',                        |
    |   'kafka.retention.size' = '0 bytes',                                |
    |   'kafka.retention.time' = '604800000 ms',                           |
    |   'key.format' = 'avro-registry',                                    |
    |   'scan.bounded.mode' = 'unbounded',                                 |
    |   'scan.startup.mode' = 'latest-offset',                             |
    |   'value.format' = 'avro-registry',                                  |
    |   'value.format.schema-context' = '.lsrc-newcontext'                 |
    | )                                                                    |
    |                                                                      |
    +----------------------------------------------------------------------+
    

Inferred tables schema evolution¶

You can use the ALTER TABLE statement to evolve schemas for inferred tables.

The following examples show output from the SHOW CREATE TABLE statement called on the resulting table.

Schema Registry columns overlap with computed/metadata columns¶

For the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "uid",
      "type": "int"
    }
  ]
}

Evolve a table by adding metadata:

ALTER TABLE t_metadata_overlap ADD `timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA;

SHOW CREATE TABLE returns the following output:

CREATE TABLE t_metadata_overlap` (
  `key` VARBINARY(2147483647),
  `uid` INT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  ...
)
Properties
  • Schema Registry says there is a timestamp physical column, but Flink says there is timestamp metadata column.

  • In this case, metadata columns and computed columns have precedence, and Confluent Cloud for Apache Flink removes the physical column from the schema.

  • Because Confluent Cloud for Apache Flink advertises FULL_TRANSITIVE mode, queries still work, and the physical column is set to NULL in the payload:

    INSERT INTO t_metadata_overlap
      SELECT CAST(NULL AS BYTES), 42, TO_TIMESTAMP_LTZ(0, 3);
    

Evolve the table by renaming metadata:

ALTER TABLE t_metadata_overlap DROP `timestamp`;

ALTER TABLE t_metadata_overlap
  ADD message_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp';

SELECT * FROM t_metadata_overlap;

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_metadata_overlap` (
  `key` VARBINARY(2147483647),
  `uid` INT NOT NULL,
  `timestamp` VARCHAR(2147483647),
  `message_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  ...
)
Properties
  • Now, both physical and metadata columns appear and can be accessed for reading and writing.

Enrich a column that has no Schema Registry information¶

For the following value schema in Schema Registry:

{
  "type": "record",
  "name": "TestRecord",
  "fields": [
    {
      "name": "uid",
      "type": "int"
    }
  ]
}

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_enrich_raw_key` (
  `key` VARBINARY(2147483647),
  `uid` INT NOT NULL
  ) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'raw',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • Schema Registry provides only information for the value part.
  • Because the key part is not backed by Schema Registry, the key.format is raw.
  • The default data type of raw is BYTES, but you can change this by using the ALTER TABLE statement.

Evolve the table by giving a raw format column a specific type:

ALTER TABLE t_enrich_raw_key MODIFY key STRING;

SHOW CREATE TABLE returns the following output:

CREATE TABLE `t_enrich_raw_key` (
  `key` STRING,
  `uid` INT NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'raw',
  'value.format' = 'avro-registry'
  ...
)
Properties
  • Only changes to simple, atomic types, like INT, BYTES, and STRING are supported, where the binary representation is clear.
  • For more complex modifications, use Schema Registry.
  • In multi-cluster scenarios, the ALTER TABLE statement must be executed for every cluster, because the data type for key is stored in the Flink regional metastore.

Configure Schema Registry subject names¶

When working with topics that use RecordNameStrategy or TopicRecordNameStrategy, you can configure the subject names for the schema resolution in Schema Registry. This is particularly useful when handling multiple event types in a single topic.

For topics using these strategies, Flink initially infers a raw binary table:

SHOW CREATE TABLE events;

Your output will show a raw binary structure:

CREATE TABLE `events` (
  `key` VARBINARY(2147483647),
  `value` VARBINARY(2147483647)
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'key.format' = 'raw',
  'value.format' = 'raw'
)

Configure value schema subject names for each format:

ALTER TABLE events SET (
  'value.format' = 'avro-registry',
  'value.avro-registry.subject-names' = 'com.example.Order;com.example.Shipment'
);
ALTER TABLE events SET (
  'value.format' = 'json-registry',
  'value.json-registry.subject-names' = 'com.example.Order;com.example.Shipment'
);
ALTER TABLE events SET (
  'value.format' = 'proto-registry',
  'value.proto-registry.subject-names' = 'com.example.Order;com.example.Shipment'
);

If your topic uses keyed messages, you can also configure the key format:

ALTER TABLE events SET (
  'key.format' = 'avro-registry',
  'key.avro-registry.subject-names' = 'com.example.OrderKey'
);

You can configure both key and value schema subject names in a single statement:

ALTER TABLE events SET (
  'key.format' = 'avro-registry',
  'key.avro-registry.subject-names' = 'com.example.OrderKey',
  'value.format' = 'avro-registry',
  'value.avro-registry.subject-names' = 'com.example.Order;com.example.Shipment'
);
Properties:
  • Use semicolons (;) to separate multiple subject names
  • Subject names must match exactly with the names registered in Schema Registry
  • The format prefix (avro-registry, json-registry, or proto-registry) must match the schema format in Schema Registry

Related content¶

  • Video: How to Set Idle Timeouts

SELECT examples¶

The following examples show frequently used scenarios for SELECT.

Most minimal statement¶

Syntax
SELECT 1;
Properties
  • Statement is bounded

Check local time zone is configured correctly¶

Syntax
SELECT NOW();
Properties
  • Statement is bounded
  • NOW() returns a TIMSTAMP_LTZ(3), so if the client is configured correctly, it should show a timestamp in your local time zone.

Combine multiple tables into one¶

Syntax
CREATE TABLE t_union_1 (i INT);
CREATE TABLE t_union_2 (i INT);
TABLE t_union_1 UNION ALL TABLE t_union_2;

-- alternate syntax
SELECT * FROM t_union_1
UNION ALL
SELECT * FROM t_union_2;

Get insights into the current watermark¶

Syntax
CREATE TABLE t_watermarked_insight (s STRING) DISTRIBUTED INTO 1 BUCKETS;

INSERT INTO t_watermarked_insight VALUES ('Bob'), ('Alice'), ('Charly');

SELECT $rowtime, CURRENT_WATERMARK($rowtime) FROM t_watermarked_insight;

The output resembles:

$rowtime                EXPR$1
2024-04-29 11:59:01.080 NULL
2024-04-29 11:59:01.093 2024-04-04 15:27:37.433
2024-04-29 11:59:01.094 2024-04-04 15:27:37.433
Properties
  • The CURRENT_WATERMARK function returns the watermark that arrived at the operator evaluating the SELECT statement.
  • The returned watermark is the minimum of all inputs, across all tables/topics and their partitions.
  • If a common watermark was not received from all inputs, the function returns NULL.
  • The CURRENT_WATERMARK function takes a time attribute, which is a column that has WATERMARK FOR defined.

A watermark is always emitted after the row has been processed, so the first row always has a NULL watermark.

Because the default watermark algorithm requires at least 250 records, initially it assumes the maximum lag of 7 days plus a safety margin of 7 days.

The watermark quickly (exponentially) goes down as more data arrives.

Sources emit watermarks every 200 ms, but within the first 200 ms they emit per row for powering examples like this.

Flatten fields into columns¶

Syntax
CREATE TABLE t_flattening (i INT, r1 ROW<i INT, s STRING>, r2 ROW<other INT>);

SELECT r1.*, r2.* FROM t_flattening;
Properties
You can apply the * operator on nested data, which enables flattening fields into columns of the table.

Schema reference examples¶

The following examples show how to use schema references in Flink SQL.

For the following schemas in Schema Registry:

{
   "type":"record",
   "namespace": "io.confluent.developer.avro",
   "name":"Purchase",
   "fields": [
      {"name": "item", "type":"string"},
      {"name": "amount", "type": "double"},
      {"name": "customer_id", "type": "string"}
   ]
}
syntax = "proto3";

package io.confluent.developer.proto;

message Purchase {
   string item = 1;
   double amount = 2;
   string customer_id = 3;
}
{
   "$schema": "http://json-schema.org/draft-07/schema#",
   "title": "Purchase",
   "type": "object",
   "properties": {
      "item": {
         "type": "string"
      },
      "amount": {
         "type": "number"
      },
      "customer_id": {
         "type": "string"
      }
   },
   "required": ["item", "amount", "customer_id"]
}

{
   "type":"record",
   "namespace": "io.confluent.developer.avro",
   "name":"Pageview",
   "fields": [
      {"name": "url", "type":"string"},
      {"name": "is_special", "type": "boolean"},
      {"name": "customer_id", "type":  "string"}
   ]
}
syntax = "proto3";

package io.confluent.developer.proto;

message Pageview {
   string url = 1;
   bool is_special = 2;
   string customer_id = 3;
}
{
   "$schema": "http://json-schema.org/draft-07/schema#",
   "title": "Pageview",
   "type": "object",
   "properties": {
      "url": {
         "type": "string"
      },
      "is_special": {
         "type": "boolean"
      },
      "customer_id": {
         "type": "string"
      }
   },
   "required": ["url", "is_special", "customer_id"]
}

[
   "io.confluent.developer.avro.Purchase",
   "io.confluent.developer.avro.Pageview"
]
syntax = "proto3";

package io.confluent.developer.proto;

import "purchase.proto";
import "pageview.proto";

message CustomerEvent {
   oneof action {
      Purchase purchase = 1;
      Pageview pageview = 2;
   }
}
{
   "$schema": "http://json-schema.org/draft-07/schema#",
   "title": "CustomerEvent",
   "type": "object",
   "oneOf": [
      { "$ref": "io.confluent.developer.json.Purchase" },
      { "$ref": "io.confluent.developer.json.Pageview" }
   ]
}

and references:

[
   {
      "name": "io.confluent.developer.avro.Purchase",
      "subject": "purchase",
      "version": 1
   },
   {
      "name": "io.confluent.developer.avro.Pageview",
      "subject": "pageview",
      "version": 1
   }
]
[
   {
      "name": "purchase.proto",
      "subject": "purchase",
      "version": 1
   },
   {
      "name": "pageview.proto",
      "subject": "pageview",
      "version": 1
   }
]
[
   {
      "name": "io.confluent.developer.json.Purchase",
      "subject": "purchase",
      "version": 1
   },
   {
      "name": "io.confluent.developer.json.Pageview",
      "subject": "pageview",
      "version": 1
   }
]

SHOW CREATE TABLE customer-events; returns the following output:

CREATE TABLE `customer-events` (
  `key` VARBINARY(2147483647),
  `Purchase` ROW<`item` VARCHAR(2147483647) NOT NULL, `amount` DOUBLE NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL>,
  `Pageview` ROW<`url` VARCHAR(2147483647) NOT NULL, `is_special` BOOLEAN NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL>
)
DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'connector' = 'confluent',
  'kafka.cleanup-policy' = 'delete',
  'kafka.max-message-size' = '2097164 bytes',
  'kafka.retention.size' = '0 bytes',
  'kafka.retention.time' = '7 d',
  'key.format' = 'raw',
  'scan.bounded.mode' = 'unbounded',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = '[VALUE_FORMAT]'
)

Split into tables for each type¶

Syntax

CREATE TABLE purchase AS
   SELECT Purchase.* FROM `customer-events`
   WHERE Purchase IS NOT NULL;

SELECT * FROM purchase;
CREATE TABLE pageview AS
   SELECT Pageview.* FROM `customer-events`
   WHERE Pageview IS NOT NULL;

SELECT * FROM pageview;

Output:

item amount customer_id
apple 9.99 u-21
jam 4.29 u-67
mango 13.99 u-67
socks 7.99 u-123
url is_special customer_id
https://www.confluent.io TRUE u-67
http://www.cflt.io FALSE u-12

Related content¶

  • Flink SQL Queries
  • Flink SQL Functions
  • DDL Statements in Confluent Cloud for Apache Flink

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.

Was this doc page helpful?

Give us feedback

Do you still need help?

Confluent support portal Ask the community
Thank you. We'll be in touch!
Be the first to get updates and new content

By clicking "SIGN UP" you agree that your personal data will be processed in accordance with our Privacy Policy.

  • Confluent
  • About
  • Careers
  • Contact
  • Professional Services
  • Product
  • Confluent Cloud
  • Confluent Platform
  • Connectors
  • Flink
  • Stream Governance
  • Developer
  • Free Courses
  • Tutorials
  • Event Streaming Patterns
  • Documentation
  • Blog
  • Podcast
  • Community
  • Forum
  • Meetups
  • Kafka Summit
  • Catalysts
Terms & Conditions Privacy Policy Do Not Sell My Information Modern Slavery Policy Cookie Settings Feedback

Copyright © Confluent, Inc. 2014- Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, Flink®️, Apache Iceberg®️, Iceberg®️ and associated open source project names are trademarks of the Apache Software Foundation

On this page:
    OSZAR »