标签搜索

目 录CONTENT

文章目录

WebSocket拦截器

陈铭
2022-04-09 / 0 评论 / 2 点赞 / 546 阅读 / 1,241 字 / 正在检测是否收录...

WebSocket拦截器(转载至CSDN

##简要说明:
a, endpoint 部分,如你的endPoint是xyz, 那么你连接的url格式就是var sockjs_url = ‘http://{ip}:{port}/xyz’;
b,跨域问题,如果没有配置好允许来自哪些前端的连接,当你连接websocket就会报跨域问题,最典型的的,如果你的前端域名是http://a.b.c:99, 那你允许的origins就必须包含来自a.b.c的域名且端口为99的连接
c,topic前缀,设置好topic的前缀后,你订阅topic的格式/myPrefixes/{youTopciPart01}/{youTopciPart012, 或者/myPrefixes/

package com.yq.config;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
@EnableWebSocketMessageBroker
@Service
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //开启/myEndPoint端点
        registry.addEndpoint("/myEndPoint")
                //允许跨域访问
                .setAllowedOrigins("*")
                //使用sockJS
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //订阅topic的前缀,比订阅topic必须是这种格式// /myPrefixes/topic1/xxx
        registry.enableSimpleBroker("/myPrefixes");
    }


    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        ChannelInterceptor interceptor = new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                log.info("Inbound preSend. message={}", message);
                StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
                MessageHeaders header = message.getHeaders();
                String sessionId = (String)header.get("simpSessionId");
                if (accessor != null && accessor.getCommand() !=null && accessor.getCommand().getMessageType() != null) {
                    SimpMessageType type = accessor.getCommand().getMessageType();
                    if (accessor!= null && SimpMessageType.CONNECT.equals(type)) {
                        String jwtToken = accessor.getFirstNativeHeader("AuthToken");

                        if(StringUtils.isNotBlank(jwtToken)) {
                            log.info("Inbound preSend: sessionId={}, jwtToken={}", sessionId, jwtToken);
                        }
                        else {
                            log.error("no token, will be disallowed to connect.");
                            return null;
                        }
                    }else if (type == SimpMessageType.DISCONNECT) {
                        log.info("Inbound sessionId={} is disconnected", sessionId);
                    }else if (type == SimpMessageType.SUBSCRIBE) {
                        String topicDest = (String)header.get("simpDestination");
                        log.info("subscribe topicDest={}, message={} SUBSCRIBE", topicDest, message);
                    } else if (type == SimpMessageType.MESSAGE) {
                        String topicDest = (String)header.get("simpDestination");
                        log.info("之前的消息 topicDest={}, message={} MESSAGE", topicDest, message);
                        message = UpdateMessage(message, "Inbound");
                        log.info("之后的消息e topicDest={}, message={} MESSAGE", topicDest, message);
                    }
                }

                return message;
            }
            @Override
            public boolean preReceive(MessageChannel channel) {
                log.info("Inbound preReceive. channel={}", channel);
                return true;
            }

            @Override
            public Message<?> postReceive(Message<?> message, MessageChannel channel) {
                log.info("Inbound postReceive. message={}", message);
                return message;
            }

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                log.info("Inbound postSend. message={}", message);
            }

            @Override
            public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
                log.info("Inbound afterSendCompletion. message={}", message);
                StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
                MessageHeaders header = message.getHeaders();
                if (accessor != null && accessor.getCommand() !=null && accessor.getCommand().getMessageType() != null) {
                    SimpMessageType type = accessor.getCommand().getMessageType();
                    if (type == SimpMessageType.SUBSCRIBE) {
                        String topicDest = (String)header.get("simpDestination");
                       log.info("afterSenfCompletion. topicDest={}, message={} SUBSCRIBE", topicDest, message);

                        String payload = "{\"myfield1\":\"afterSendCompletion初始化消息\"}";
                        messagingTemplate.convertAndSend(topicDest, payload);
                        log.info("send complete. topic={}", topicDest);
                    }
                }
            }

            @Override
            public void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
                log.info("Inbound afterReceiveCompletion. message={}", message);
            }
        };

        registration.interceptors(interceptor);
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        ChannelInterceptor interceptor = new ChannelInterceptorAdapter() {
            @Override
            public boolean preReceive(MessageChannel channel) {
                log.info("Outbound preReceive: channel={}", channel);
                return true;
            }

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                log.info("Outbound preSend: message={}", message);
                return message;
            }

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                log.info("Outbound postSend. message={}", message);
            }

            @Override
            public Message<?> postReceive(Message<?> message, MessageChannel channel) {
                log.info("Outbound postReceive. message={}", message);
                return message;
            }

            @Override
            public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
                log.info("Outbound afterSendCompletion. message={}", message);
            }

            @Override
            public void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
                log.info("Outbound afterReceiveCompletion. message={}", message);
            }
        };

        registration.interceptors(interceptor);
    }

    private Message<?> UpdateMessage(Message<?> message, String logFlag) {
        log.info(logFlag + " preSend: message={}", message);
        MessageHeaders header = message.getHeaders();
        Object obj = message.getPayload();
        //一般都是byte[]
        JSONObject jsonObj = null;
        String strUTF8 = null;
        String strJsonUTF8 = null;
        Message<?> msg = null;
        try {
            strUTF8 = new String((byte[])obj,"UTF-8");
            jsonObj = JSON.parseObject(strUTF8);
            jsonObj.put(logFlag + "ChannelContent2", "add to");
            String value = jsonObj.getString("myfield1");
            jsonObj.put("myfield1", logFlag + " add to " + value);
            strJsonUTF8 = jsonObj.toJSONString();
            byte[] msgToByte = strJsonUTF8.getBytes("UTF-8");
            msg = new GenericMessage<>(msgToByte, header);
        }
        catch (Exception ex) {
            log.info("(byte[] to string exception. ex={}", ex.getLocalizedMessage());
        }

        if (msg != null) {
            log.info(logFlag + " preSend Modified: message={}, strUTF8={}, strJsonUTF8={}", msg, strUTF8, strJsonUTF8);
            return msg;
        }
        else {
            log.info(logFlag + " preSend Original: message={}, strUTF8={}", message, strUTF8);
            return message;
        }
    }
//"simpMessageType" -> "MESSAGE"
    Message sendInitMsg(Message<?> oldMessage, String dest, Map<String, Object> headers, String payload) {
        MessageHeaders messageHeaders = null;
        Object conversionHint = headers != null?headers.get("conversionHint"):null;
        Map<String, Object> headersToUse = new HashMap<>();
        headersToUse.put("simpMessageType", SimpMessageType.MESSAGE);
        headersToUse.put("destination", dest);
        headersToUse.put("contentType", "text/plain;charset=UTF-8");
        headersToUse.put("stompCommand", "SEND");

        Map<String, Object> nativeHeaders = new LinkedHashMap<>();
        nativeHeaders.put("id", "sub-0");
        nativeHeaders.put("destination", dest);
        headersToUse.put("nativeHeaders", nativeHeaders);
        messageHeaders = new MessageHeaders(headersToUse);
        MessageHeaders oldHeaders = oldMessage.getHeaders()

        MessageConverter converter = new SimpleMessageConverter();
        Message<?> message = converter instanceof SmartMessageConverter ?((SmartMessageConverter)converter).toMessage(payload, messageHeaders, conversionHint):converter.toMessage(payload, messageHeaders);
        return message;
    }
}
2

评论区