项目后端架构采用SpringCloudSpringBoot前段使用Vue SockJS。
三方框架
版本
SpringCloud
Dalston.SR4
SpringBoot
1.5.10
spring-cloud-starter-zuul
1.3.5
现在需要将异步任务的结果通过websocket通知前端希望可以websocket连接穿透zuul网关访问内部的websocket server微服务。找了一圈目前zuul 1.x还不支持websocket说是2.x会支持。后来找到github上有针对1.x的解决方法https://github.com/mthizo247/spring-cloud-netflix-zuul-websocket作者提供了demo可以跑通hello world。不过demo中是订阅topic广播的例子。
下面围绕demo提供的例子来实现点对点发送消息的功能。
思路是每个客户端连接后创建一个clientId并且将clientid存储到数据库后续可以将用户信息或者公司信息绑定到clientId上服务器端发送消息到指定client。
对于zuul和websocket微服务上的websocket连接必须使用同一个clientId这样方可实现点对点。具体步骤在zuul网关部分通过websocket handshakeinteceptor获取一个clientId然后clientId作为principal并且将clientId传递到websocket微服务在websocket微服务通过websocket handshakeinteceptor获取到clientId然后clientId作为principal。
连接创建后后端通过messagingTemplate的convertAndSendToUser发送消息给用户。
MessageMapping("/greeting")
public void greeting(HelloMessage message, MessageHeaders messageHeaders) throws Exception {
String sessionId this.getSessionId(messageHeaders);
Map a new HashMap();
a.put("body", message.getName());
messagingTemplate.convertAndSendToUser(sessionId, "/queue/notifications", payload)
}
private String getSessionId(MessageHeaders messageHeaders){
if(messageHeaders.get("simpSessionAttributes")!null ((Map)messageHeaders.get("simpSessionAttributes")).get("session_id");
if (sessionId!null) {
return sessionId.toString();
}
}
throw new RuntimeException("session id 丢失");
}
hello项目中websocketConfig的配置比较简单
package com.github.mthizo247.hello;
import java.security.Principal;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
Configuration
EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/gs-guide-websocket")
.setHandshakeHandler(myDefaultHandshakeHandler())
.addInterceptors(handshakeInterceptor())
// bypasses spring web security
.setAllowedOrigins("*").withSockJS();
}
Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// prefix for subscribe
config.enableSimpleBroker("/queue");
// prefix for send
config.setApplicationDestinationPrefixes("/app");
}
Bean
public HandshakeInterceptor handshakeInterceptor() {
return new HandshakeInterceptor() {
Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
//此处是关键header中的session_id是通过zuul端创建websocket conenction中传递过来
ServletServerHttpRequest servletRequest (ServletServerHttpRequest) request;
attributes.put("session_id", servletRequest.getServletRequest().getHeader("session_id"));
return true;
}
return true;
}
Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
};
}
//WebSocket 握手处理器
private DefaultHandshakeHandler myDefaultHandshakeHandler(){
return new DefaultHandshakeHandler(){
Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {
//设置认证通过的用户到当前会话中
final String sessionid (String) attributes.get("session_id");
Principal principal new Principal() {
Override
public String getName() {
return sessionid;
}
};
return principal;
}
};
}
}
zuul网关部分需要使用spring-cloud-netflix-zuul-websocket的源代码直接引用jar包不能解决问题proxy项目pom.xml中添加源码项目的依赖接着修改spring-cloud-netflix-zuul-websocket中的ZuulWebSocketConfiguration的addStompEndpoint方法。
private SockJsServiceRegistration addStompEndpoint(StompEndpointRegistry registry, String... endpoint) {
return registry.addEndpoint(endpoint)
// 手动添加handshakehandler和interceptor用于设置principal clientId
.setHandshakeHandler(myDefaultHandshakeHandler())
.addInterceptors(handshakeInterceptor())
// bypasses spring web security
.setAllowedOrigins("*").withSockJS();
}
并且添加两个方法在beforehandshake中获取request中的请求链接这里有sockjssession的id我们的clientid其实是这个sessionid
Bean
public HandshakeInterceptor handshakeInterceptor() {
return new HandshakeInterceptor() {
Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
// 从websocket的请求链接requestURI中获取到sockjssession的id并用于user
ServletServerHttpRequest servletRequest (ServletServerHttpRequest) request;
String uri servletRequest.getServletRequest().getRequestURI();
int lastLashIndex uri.lastIndexOf("/");
uri uri.substring(0, lastLashIndex);
uri uri.substring(uri.lastIndexOf("/")1);
attributes.put("session_id", uri);
return true;
}
return true;
}
Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
};
}
//WebSocket 握手处理器
private DefaultHandshakeHandler myDefaultHandshakeHandler(){
return new DefaultHandshakeHandler(){
Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {
// 利用client_id用于点对点发送
final String client_id (String) attributes.get("session_id");
Principal principal new Principal() {
Override
public String getName() {
return client_id;
}
};
return principal;
}
};
}
目前为止zuul及websocket server服务各自的配置完成接着需要将这个sessionid传递需要修改ProxyWebSocketConnectionManager的buildWebSocketHttpHeaders方法将sessionid添加到socket connection的header中
private WebSocketHttpHeaders buildWebSocketHttpHeaders() {
WebSocketHttpHeaders wsHeaders new WebSocketHttpHeaders();
if (httpHeadersCallback ! null) {
httpHeadersCallback.applyHeaders(userAgentSession, wsHeaders);
// 将session_id添加到header中
wsHeaders.put("session_id", Lists.newArrayList(userAgentSession.getId()));
}
return wsHeaders;
}
服务器端修改完毕接着在web客户端app.js中添加订阅用户队列通知
function connect() {
var socket
new SockJS(http://localhost:7078/gs-guide-websocket, null, {
transports: [websocket]
});
stompClient Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log(Connected: frame);
// 订阅用户消息通知
stompClient.subscribe("/user/queue/notifications",handleNotification);
});
function handleNotification(message) {
showGreeting(message);
}
}
见证一下效果