Aiven Kafka Quickstart with Flink and Python
pip install -r requirements.txt
-
Create Aiven Account (Use SSO)
-
Create Token for CLI Access in https://console.aiven.io/profile/tokens How To Create Authentication Tokens Save Token
-
avn user login [email protected] --token #input token from step 2
-
Set environment variables
KAFKA_INSTANCE_NAME=demokafka
CLOUD_PROVIDER=google-us-central1
AIVEN_PLAN_NAME=startup-2
DESTINATION_FOLDER_NAME=~/kafkacerts
- Use CLI to create Kafka cluster or UI by following Kafka getting started guide
avn service create \
-t kafka $KAFKA_INSTANCE_NAME \
--cloud $CLOUD_PROVIDER \
-p $AIVEN_PLAN_NAME \
-c kafka_rest=true \
- Download credentials for Python script
avn service user-creds-download $KAFKA_INSTANCE_NAME \
-d $DESTINATION_FOLDER_NAME \
--username avnadmin
- return the URI
avn service get $KAFKA_INSTANCE_NAME \
--format '{service_uri}'
- Wait Until Service is running
avn service wait $KAFKA_INSTANCE_NAME
python iot_faker.py \
--cert-folder ~/kafkacerts/ \
--host demokafka-username-x11x.aivencloud.com \
--port 27721 \
--topic-name iot
5. Create Demo Flink Environment with UI or CLI. If UI, follow Flink getting started guide
avn service create demoflink -t flink --plan business-4
CREATE TABLE iot (
device VARCHAR,
deviceParameter VARCHAR,
deviceValue INT,
dateTime TIMESTAMP
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'iot',
'value.format' = 'json',
'key.format' = 'json',
'key.fields' = 'device'
)
CREATE TABLE temperatures (
device VARCHAR,
deviceValue INT,
timed TIMESTAMP,
jitter DOUBLE
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'temperatures',
'value.format' = 'json',
'key.format' = 'json',
'key.fields' = 'device'
)
INSERT INTO temperatures
SELECT device, deviceValue, dateTime, RAND() AS jitter
FROM iot
WHERE deviceParameter = 'Temperature'