flink-quarkus-demo

Flink Quarkus Demo

This is a basic demo showing a Quarkus app, Apache Kafka backend and Apache Flink on Confluent Cloud.

Overview of the Components

Quarkus, Kafka, and Flink Integration Demo - visual selection-components.svg

Quarkus

Quarkus is a Kubernetes-native Java framework tailored for GraalVM and OpenJDK HotSpot. It optimizes Java specifically for containers, enabling developers to create applications that are lightweight and fast.

Apache Kafka

Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. It is used for building real-time data pipelines and streaming applications. In this demo, Kafka serves as the backbone for data transmission.

Apache Flink

Apache Flink is a stream processing framework that allows for the processing of data in real-time. It is designed for high-throughput and low-latency data processing, making it ideal for analytics and event-driven applications.

Confluent Cloud

Confluent Cloud is a fully managed Apache Kafka, Flink and other related services that simplifies the deployment and management of Kafka clusters. It provides a scalable and reliable environment for streaming data.


Quarkus, Kafka, and Flink Integration Demo - visual selection.svg

Data Flow

Data Loading

The Netflix user information is initially loaded from a CSV file. This data includes various metrics related to user engagement.

Kafka Topic

The loaded data is sent to a Kafka topic. This topic acts as a conduit for the data, allowing it to be streamed to the Confluent Cloud broker.

Data Format

The data is serialized in Apache Avro format, which provides a compact binary representation and schema evolution capabilities.

Flink Queries

Once the data is aggregated in Kafka, two Flink queries are executed:

  • Average View Duration: This query calculates the average time users spend watching content.
  • Daily Engagement Titles: This query identifies the titles that have the highest user engagement on a daily basis.

Setting up the demo

Confluent Cloud

This demo used the confluent cloud stack cli documented here

Its a very useful cli that will provsion the entire environemt, cluster and most improtantly the registry as well. It also has the option to destroy the entire thing once done. Which is actually very helpful!

Anyways here is a similar out from what you should be able to see at the end after you run the command as per instructions on the link shared above.

To destroy this Confluent Cloud stack run ->
./ccloud_stack_destroy.sh stack-configs/java-service-account-sa-234cdf.config

try to view the config file and you should find most of the information required for the application.properties file. Except a couple more that you will find at Confluent cloud >> environment >> cluster >> topics. On top you will see the option for clients. Click that and in the Java Clients you should get the client id and more.

Local project config

Following is the local configuration in the application.properties. All you need to do before starting the quarkus is set the env variables. You will need to get most of the information from Confluent cloud >> environment >> cluster >> topics. On top you will see the option for clients You can also setup the CSV_FILE_PATH which is the documents directory in this project folder.

# messaging connector settings.
mp.messaging.connector.smallrye-kafka.bootstrap.servers={BOOTSTRAP_SERVER}

# Kafka Security Settings
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
mp.messaging.connector.smallrye-kafka.sasl.mechanism=PLAIN
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username={CONNECTOR_USER} password={CONNECTOR_PASSWORD}

# Schema Registry Configuration
mp.messaging.connector.smallrye-kafka.schema.registry.url={REGISTRY_URL}
mp.messaging.connector.smallrye-kafka.schema.registry.basic.auth.credentials.source=USER_INFO
mp.messaging.connector.smallrye-kafka.schema.registry.basic.auth.user.info={REGISTRY_AUTH_KEY}

# Kafka Topic Replication Factor (for Confluent Cloud)
mp.messaging.connector.smallrye-kafka.replication.factor=3

# DNS lookup setting
mp.messaging.connector.smallrye-kafka.client.dns.lookup=use_all_dns_ips

# Consumer-specific settings
mp.messaging.connector.smallrye-kafka.consumer.session.timeout.ms=45000

# Producer-specific settings
mp.messaging.connector.smallrye-kafka.producer.acks=all
mp.messaging.connector.smallrye-kafka.producer.client.id={CLIENT_ID}

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.users-uk-netflix-out.connector=smallrye-kafka
mp.messaging.outgoing.users-uk-netflix-out.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
mp.messaging.outgoing.users-uk-netflix-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.users-uk-netflix-out.topic=users-uk-netflix-out
mp.messaging.outgoing.users-uk-netflix-out.schema.registry.url=${mp.messaging.connector.smallrye-kafka.schema.registry.url}
mp.messaging.outgoing.users-uk-netflix-out.auto.register.schemas=true
#mp.messaging.outgoing.users-uk-netflix-out.use.schema.id=100002

# Single CSV File, can also be found in documents dir
csv-file-path={CSV_FILE_PATH}


Running the application in dev mode

You can run your application in dev mode that enables live coding using. It will start sending messages out. I am not printing them out on the console, but they should be visible in the cloud console, under topics>>messages.

./mvnw quarkus:dev

Daily engagement titles, T10

SELECT 
    DATE_FORMAT(TO_TIMESTAMP_LTZ(dateTime, 3), 'yyyy-MM-dd') AS watch_date,
    title,
    COUNT(*) AS daily_view_count,
    SUM(duration) AS daily_total_watch_time
FROM `ccloud-stack-sa-917qyv-ccloud-stack-script`.`demo-kafka-cluster-sa-917qyv`.`users-uk-netflix-out`
GROUP BY DATE_FORMAT(TO_TIMESTAMP_LTZ(dateTime, 3), 'yyyy-MM-dd'), title
ORDER BY watch_date DESC, daily_total_watch_time DESC
LIMIT 10;

Average duration watched T10

SELECT 
    title, 
    AVG(duration) AS avg_watch_duration
FROM `ccloud-stack-sa-917qyv-ccloud-stack-script`.`demo-kafka-cluster-sa-917qyv`.`users-uk-netflix-out`
GROUP BY title
ORDER BY avg_watch_duration DESC
LIMIT 10;
Visit original content creator repository https://github.com/sshaaf/flink-quarkus-demo

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *