我使用的是kafka-node npm模块,0.2.27版本.我发现producer.on(‘ready’,fn(){})永远不会被调用. 当我检查生产者对象时,观察它如下 { ready: true, client: { connectionString: '10.196.160.100.:2181,10.196.160.150:2
当我检查生产者对象时,观察它如下
{ ready: true, client: { connectionString: '10.196.160.100.:2181,10.196.160.150:2181,10.196.160.151:2181', clientId: 'dev', zkOptions: { sessionTimeOut: 10000, spinDelay: 1000, retries: 10 }, brokers: { 'custom-kafka.mine.com:9092': [Object], 'custom-storm.mine.com:9092': [Object] }, longpollingBrokers: {}, topicMetadata: {}, topicPartitions: {}, correlationId: 0, cbqueue: {}, brokerMetadata: { '0': [Object], '1': [Object] }, ready: true, zk: { client: [Object], _events: [Object], inited: true }, _events: { ready: [Object], error: [Object], close: [Object] } }, requireAcks: 1, ackTimeoutMs: 100, _events: {} }
我没有等待(‘ready’)事件,而是检查了(producer.ready)是否能够稍微超时发布到kafka.理想情况下,应触发事件.我不确定我是否采取了正确的方法.非常感谢在这个方向的任何指针.提前致谢
请尝试以下代码:var kafka = require('kafka-node'), Producer = kafka.Producer, client = new kafka.Client('192.168.50.252:2181'), producer = new Producer(client), payloads = [ { topic: 'test topic', messages: ['test message'] } ]; client.on('ready', function (){ console.log('client ready'); }) client.on('error', function (err){ console.log('client error: ' + err); }) producer.on('ready', function () { producer.send(payloads, function (err, data) { console.log('send: ' + data); process.exit(); }); }); producer.on('error', function (err) { console.log('error: ' + err); process.exit(); });