当前位置 : 主页 > 网络编程 > 其它编程 >

ws配置zuul_SpringCloudSockjs前后端分离实现WebSocket点对点发送消息

来源:互联网 收集:自由互联 发布时间:2023-07-02
项目后端架构采用SpringCloudSpringBoot前段使用VueSockJS。三方框架版本SpringCloudDalston.SR4SpringBoot1 项目后端架构采用SpringCloudSpringBoot前段使用Vue SockJS。 三方框架 版本 SpringCloud Dalston.SR4 Sprin
项目后端架构采用SpringCloudSpringBoot前段使用VueSockJS。三方框架版本SpringCloudDalston.SR4SpringBoot1

项目后端架构采用SpringCloudSpringBoot前段使用Vue SockJS。

三方框架

版本

SpringCloud

Dalston.SR4

SpringBoot

1.5.10

spring-cloud-starter-zuul

1.3.5

现在需要将异步任务的结果通过websocket通知前端希望可以websocket连接穿透zuul网关访问内部的websocket server微服务。找了一圈目前zuul 1.x还不支持websocket说是2.x会支持。后来找到github上有针对1.x的解决方法https://github.com/mthizo247/spring-cloud-netflix-zuul-websocket作者提供了demo可以跑通hello world。不过demo中是订阅topic广播的例子。

下面围绕demo提供的例子来实现点对点发送消息的功能。

思路是每个客户端连接后创建一个clientId并且将clientid存储到数据库后续可以将用户信息或者公司信息绑定到clientId上服务器端发送消息到指定client。

对于zuul和websocket微服务上的websocket连接必须使用同一个clientId这样方可实现点对点。具体步骤在zuul网关部分通过websocket handshakeinteceptor获取一个clientId然后clientId作为principal并且将clientId传递到websocket微服务在websocket微服务通过websocket handshakeinteceptor获取到clientId然后clientId作为principal。

连接创建后后端通过messagingTemplate的convertAndSendToUser发送消息给用户。

MessageMapping("/greeting")

public void greeting(HelloMessage message, MessageHeaders messageHeaders) throws Exception {

String sessionId this.getSessionId(messageHeaders);

Map a new HashMap();

a.put("body", message.getName());

messagingTemplate.convertAndSendToUser(sessionId, "/queue/notifications", payload)

}

private String getSessionId(MessageHeaders messageHeaders){

if(messageHeaders.get("simpSessionAttributes")!null ((Map)messageHeaders.get("simpSessionAttributes")).get("session_id");

if (sessionId!null) {

return sessionId.toString();

}

}

throw new RuntimeException("session id 丢失");

}

hello项目中websocketConfig的配置比较简单

package com.github.mthizo247.hello;

import java.security.Principal;

import java.util.Map;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.http.server.ServletServerHttpRequest;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.server.HandshakeInterceptor;

import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

Configuration

EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

registry.addEndpoint("/gs-guide-websocket")

.setHandshakeHandler(myDefaultHandshakeHandler())

.addInterceptors(handshakeInterceptor())

// bypasses spring web security

.setAllowedOrigins("*").withSockJS();

}

Override

public void configureMessageBroker(MessageBrokerRegistry config) {

// prefix for subscribe

config.enableSimpleBroker("/queue");

// prefix for send

config.setApplicationDestinationPrefixes("/app");

}

Bean

public HandshakeInterceptor handshakeInterceptor() {

return new HandshakeInterceptor() {

Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,

Map attributes) throws Exception {

if (request instanceof ServletServerHttpRequest) {

//此处是关键header中的session_id是通过zuul端创建websocket conenction中传递过来

ServletServerHttpRequest servletRequest (ServletServerHttpRequest) request;

attributes.put("session_id", servletRequest.getServletRequest().getHeader("session_id"));

return true;

}

return true;

}

Override

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,

Exception exception) {

}

};

}

//WebSocket 握手处理器

private DefaultHandshakeHandler myDefaultHandshakeHandler(){

return new DefaultHandshakeHandler(){

Override

protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {

//设置认证通过的用户到当前会话中

final String sessionid (String) attributes.get("session_id");

Principal principal new Principal() {

Override

public String getName() {

return sessionid;

}

};

return principal;

}

};

}

}

zuul网关部分需要使用spring-cloud-netflix-zuul-websocket的源代码直接引用jar包不能解决问题proxy项目pom.xml中添加源码项目的依赖接着修改spring-cloud-netflix-zuul-websocket中的ZuulWebSocketConfiguration的addStompEndpoint方法。

private SockJsServiceRegistration addStompEndpoint(StompEndpointRegistry registry, String... endpoint) {

return registry.addEndpoint(endpoint)

// 手动添加handshakehandler和interceptor用于设置principal clientId

.setHandshakeHandler(myDefaultHandshakeHandler())

.addInterceptors(handshakeInterceptor())

// bypasses spring web security

.setAllowedOrigins("*").withSockJS();

}

并且添加两个方法在beforehandshake中获取request中的请求链接这里有sockjssession的id我们的clientid其实是这个sessionid

Bean

public HandshakeInterceptor handshakeInterceptor() {

return new HandshakeInterceptor() {

Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,

Map attributes) throws Exception {

if (request instanceof ServletServerHttpRequest) {

// 从websocket的请求链接requestURI中获取到sockjssession的id并用于user

ServletServerHttpRequest servletRequest (ServletServerHttpRequest) request;

String uri servletRequest.getServletRequest().getRequestURI();

int lastLashIndex uri.lastIndexOf("/");

uri uri.substring(0, lastLashIndex);

uri uri.substring(uri.lastIndexOf("/")1);

attributes.put("session_id", uri);

return true;

}

return true;

}

Override

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,

Exception exception) {

}

};

}

//WebSocket 握手处理器

private DefaultHandshakeHandler myDefaultHandshakeHandler(){

return new DefaultHandshakeHandler(){

Override

protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {

// 利用client_id用于点对点发送

final String client_id (String) attributes.get("session_id");

Principal principal new Principal() {

Override

public String getName() {

return client_id;

}

};

return principal;

}

};

}

目前为止zuul及websocket server服务各自的配置完成接着需要将这个sessionid传递需要修改ProxyWebSocketConnectionManager的buildWebSocketHttpHeaders方法将sessionid添加到socket connection的header中

private WebSocketHttpHeaders buildWebSocketHttpHeaders() {

WebSocketHttpHeaders wsHeaders new WebSocketHttpHeaders();

if (httpHeadersCallback ! null) {

httpHeadersCallback.applyHeaders(userAgentSession, wsHeaders);

// 将session_id添加到header中

wsHeaders.put("session_id", Lists.newArrayList(userAgentSession.getId()));

}

return wsHeaders;

}

服务器端修改完毕接着在web客户端app.js中添加订阅用户队列通知

function connect() {

var socket

new SockJS(http://localhost:7078/gs-guide-websocket, null, {

transports: [websocket]

});

stompClient Stomp.over(socket);

stompClient.connect({}, function (frame) {

setConnected(true);

console.log(Connected: frame);

// 订阅用户消息通知

stompClient.subscribe("/user/queue/notifications",handleNotification);

});

function handleNotification(message) {

showGreeting(message);

}

}

见证一下效果

网友评论