1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
CREATE DATABASE flink; USE flink;
CREATE TABLE users ( user_id BIGINT, user_name VARCHAR(1000), region VARCHAR(1000) );
INSERT INTO users VALUES (1, 'Timo', 'Berlin'), (2, 'Tom', 'Beijing'), (3, 'Apple', 'Beijing');
CREATE TABLE users ( user_id BIGINT, user_name STRING, region STRING ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'database-name' = 'flink', 'table-name' = 'users', 'username' = 'root', 'password' = '123456' );
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, view_time TIMESTAMP(3), proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
INSERT INTO pageviews VALUES (1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')), (2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));
CREATE TABLE enriched_pageviews ( user_id BIGINT, user_region STRING, page_id BIGINT, view_time TIMESTAMP(3), WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND, PRIMARY KEY (user_id, page_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'enriched_pageviews', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' );
INSERT INTO enriched_pageviews SELECT pageviews.user_id, region, pageviews.page_id, pageviews.view_time FROM pageviews LEFT JOIN users ON pageviews.user_id = users.user_id;
kafka-console-consumer
CREATE TABLE pageviews_per_region ( user_region STRING, cnt BIGINT, PRIMARY KEY (user_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' )
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*) FROM enriched_pageviews WHERE user_region is not null GROUP BY user_region;
kafka-console-consumer
|