RocketMQ消费者一种启动方式 @Servicepublic class ConsumerTest { ExecutorService executorService = Executors.newSingleThreadExecutor(); @PostConstruct public void init() { executorService.submit(new Runnable() { @Override public void
@Service
public class ConsumerTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
@PostConstruct
public void init() {
executorService.submit(new Runnable() {
@Override
public void run() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("badafd");
consumer.setNamesrvAddr("");
try {
consumer.subscribe("", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println("==========>"+new String(msgs.get(0).getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
});
}
@PreDestroy
public void destroy() {
executorService.shutdown();
}
}
