1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| # 加载Kafka CSV数据 # "123","09","456.78"
vi firstload_cfg.yaml DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_for_gpkafka COLUMNS: - NAME: cust_id TYPE: int - NAME: __IGNORED__ TYPE: int - NAME: expenses TYPE: decimal(9,2) FORMAT: csv ERROR_LIMIT: 125 OUTPUT: TABLE: data_from_kafka MAPPING: - NAME: customer_id EXPRESSION: cust_id - NAME: expenses EXPRESSION: expenses - NAME: tax_due EXPRESSION: expenses * .0725 COMMIT: MINIMAL_INTERVAL: 2000
# 加载Kafka JSON数据 # { "cust_id": 123, "month": 9, "amount_paid":456.78 }
vi simple_jsonload_cfg.yaml DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: single_json_column COMMIT: MINIMAL_INTERVAL: 1000
# 加载Kafka JSON数据(带映射) # { "cust_id": 123, "month": 9, "amount_paid":456.78 }
vi jsonload_cfg.yaml DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MINIMAL_INTERVAL: 2000 # 加载Kafka Avro数据 # 1 { "cust_id": 123, "year": 1997, "expenses":[456.78, 67.89] } # Avro数据生产者 kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic topic_avrokv \ --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \ --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
vi avrokvload_cfg.yaml DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_avrokv VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 KEY: COLUMNS: - NAME: id TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 ERROR_LIMIT: 0 OUTPUT: TABLE: avrokv_from_kafka MAPPING: - NAME: id EXPRESSION: id - NAME: customer_id EXPRESSION: (c1->>'cust_id')::int - NAME: year EXPRESSION: (c1->>'year')::int - NAME: expenses EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float) COMMIT: MINIMAL_INTERVAL: 2000
|