当前位置 : 主页 > 编程语言 > 其它开发 >

【mq】从零开始实现 mq

来源:互联网 收集:自由互联 发布时间:2022-05-21
前景回顾 【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动
前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取消息 pull message

【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName

【mq】从零开始实现 mq-12-消息的批量发送与回执

【mq】从零开始实现 mq-13-注册鉴权 auth

注册鉴权

我们前面实现了 mq 的基本功能,不过还是存在一个问题,那就是 mq 没有进行鉴权。

这就会导致如果部署在公网,任何一个机器都可以连接我们的服务,这显然是不够安全的。

13

生产者实现 属性

生产者启动时新增 2 个属性:

/**
 * 账户标识
 * @since 0.1.4
 */
private String appKey;

/**
 * 账户密码
 * @since 0.1.4
 */
private String appSecret;
注册逻辑调整

注册时,添加这两个属性到服务端。

public void registerToBroker() {
    int successCount = 0;
    for(RpcChannelFuture channelFuture : this.channelFutureList) {
        ServiceEntry serviceEntry = new ServiceEntry();
        serviceEntry.setGroupName(groupName);
        serviceEntry.setAddress(channelFuture.getAddress());
        serviceEntry.setPort(channelFuture.getPort());
        serviceEntry.setWeight(channelFuture.getWeight());

        BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
        brokerRegisterReq.setServiceEntry(serviceEntry);
        brokerRegisterReq.setMethodType(MethodType.P_REGISTER);
        brokerRegisterReq.setTraceId(IdHelper.uuid32());
        brokerRegisterReq.setAppKey(appKey);
        brokerRegisterReq.setAppSecret(appSecret);
        log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq));
        final Channel channel = channelFuture.getChannelFuture().channel();
        MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class);
        log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp));
        if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
            successCount++;
        }
    }
    if(successCount <= 0 && check) {
        log.error("校验 broker 可用性,可连接成功数为 0");
        throw new MqException(MqCommonRespCode.P_REGISTER_TO_BROKER_FAILED);
    }
}
消费者

消费者连接到 broker 也是类似的,此处不做赘述。

Broker 的处理 注册逻辑

以前注册是直接成功,此处加一个业务判断。

// 生产者注册
if(MethodType.P_REGISTER.equals(methodType)) {
    BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
    if(!brokerRegisterValidService.producerValid(registerReq)) {
        log.error("{} 生产者注册验证失败", JSON.toJSON(registerReq));
        throw new MqException(MqBrokerRespCode.P_REGISTER_VALID_FAILED);
    }

    return registerProducerService.register(registerReq.getServiceEntry(), channel);
}

首先会校验有效性,这个是一个接口,可自行灵活替换。

其他业务逻辑

其他业务处理时,都需要 registerProducerService.checkValid(channelId); 进行有效性判断。

// 生产者注销
if(MethodType.P_UN_REGISTER.equals(methodType)) {
    registerProducerService.checkValid(channelId);

    BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
    return registerProducerService.unRegister(registerReq.getServiceEntry(), channel);
}
小结

注册鉴权实现的原理非常简单,不过可以为安全性提供最基础的保障。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

网友评论