1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| var kafka = require('kafka-node'); var Mock = require('mockjs'); const Random = Mock.Random;
let conn = { 'kafkaHost': 'hadoop01:9092' }; var MQ = function () { this.mq_producers = {}; this.client = {}; }
MQ.prototype.AddProducer = function (conn, handler) { console.log('增加生产者', conn, this); this.client = new kafka.KafkaClient(conn); let producer = new kafka.Producer(this.client);
producer.on('ready', function () { if (!!handler) { handler(producer); } });
producer.on('error', function (err) { console.error('producer error ', err.stack); });
this.mq_producers['common'] = producer; return producer; }
console.log(MQ); var mq = new MQ();
var topicName = "test01" var datajson = { "business": "sdasf", "database": "sqweqr", "es": 2314, "sql": "", "table": "t_cash_loan", "ts": 1576050001925, "type": "UPDATE" }
mq.AddProducer(conn, function (producer) { producer.createTopics([topicName], function () { setInterval(function () { let data = Mock.mock(datajson) let msg = JSON.stringify(data)
var _msg = { topic: [topicName], messages: msg } mq.mq_producers['common'].send([_msg], function (err, data) { console.log("send you can check \n kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "+topicName+" --from-beginning \n", data); }) }, 2000); }) });
|