当前位置 : 主页 > 编程语言 > java >

RocketMQ消费者一种启动方式

来源:互联网 收集:自由互联 发布时间:2021-06-28
RocketMQ消费者一种启动方式 @Servicepublic class ConsumerTest { ExecutorService executorService = Executors.newSingleThreadExecutor(); @PostConstruct public void init() { executorService.submit(new Runnable() { @Override public void
RocketMQ消费者一种启动方式
@Service
public class ConsumerTest {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    @PostConstruct
    public void init() {
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("badafd");
                consumer.setNamesrvAddr("");
                try {
                    consumer.subscribe("", "*");
                } catch (MQClientException e) {
                    e.printStackTrace();
                }
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List
 
   msgs, ConsumeConcurrentlyContext context) {
                        try {
                            System.out.println("==========>"+new String(msgs.get(0).getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
            }
        });
    }

    @PreDestroy
    public void destroy() {
        executorService.shutdown();
    }
}
 
网友评论