Java访问ActiveMQ 1.创建gradle项目 2.增加依赖 3.创建类 4.启动服务器 5.生产 6.消费 7.git仓库地址 ActiveMQ 发布订阅模式 1.创建
Java访问ActiveMQ
- 1.创建gradle项目
- 2.增加依赖
- 3.创建类
- 4.启动服务器
- 5.生产
- 6.消费
- 7.git仓库地址
ActiveMQ
发布订阅模式
1.创建gradle项目
2.增加依赖
3.创建类
package com.study.config;import org.apache.activemq.ActiveMQConnection;
/**
* @author jiayq
*/
public enum ActiveMQConfig {
/**
* username
*/
USERNAME(ActiveMQConnection.DEFAULT_USER),
/**
* password
*/
PASSWORD(ActiveMQConnection.DEFAULT_PASSWORD),
/**
* url
*/
URL(ActiveMQConnection.DEFAULT_BROKER_URL);
/**
* value
*/
private String value;
private ActiveMQConfig(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}package com.study.consume;
import com.study.config.ActiveMQConfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author jiayq
*/
public class Consume {
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConfig.USERNAME.getValue(),
ActiveMQConfig.PASSWORD.getValue(), ActiveMQConfig.URL.getValue());
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建主题
Topic topic = session.createTopic("active-test");
//创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
//注册监听
consumer.setMessageListener(message -> {
try {
System.out.println(((TextMessage)message).getText());
Thread.sleep(4 * 1000);
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}package com.study.publish;
import com.study.config.ActiveMQConfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Random;
/**
* @author jiayq
*/
public class Publish {
public static void main(String[] args) {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConfig.USERNAME.getValue(),
ActiveMQConfig.PASSWORD.getValue(),ActiveMQConfig.URL.getValue());
try{
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建主题,用于订阅消息
Topic topic = session.createTopic("active-test");
//消息生产者
MessageProducer producer = session.createProducer(topic);
while (true) {
//创建消息
TextMessage message = session.createTextMessage(new String(new Random().nextDouble() + ""));
System.out.println(message.getText());
//发送
producer.send(message);
//模拟生产消息的耗时
Thread.sleep(3 * 1000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.启动服务器
5.生产
在图形化界面查看
6.消费
7.git仓库地址
https://github.com/a18792721831/MQ.git