spring framework5中集成websocket功能

网上的教程大多都太简单无法直接使用,直接跳过项目初始化过程

先看依赖

gradle依赖maven类似换个写法而已

buildscript {
    ext {
        springBootVersion = '2.0.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'cn.jgayb'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}
dependencies {
    compile('org.springframework.boot:spring-boot-starter-aop')
    compile('org.springframework.boot:spring-boot-starter-webflux')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')
}

具体代码

启动类排除数据源自动配置

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class WebsocketDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebsocketDemoApplication.class, args);
    }
}

实现WebSocketHandler接口

@Component
public class CustomWebsocketHandler implements WebSocketHandler {

    @Autowired
    private WebPushServiceImpl webPushService;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        final HandshakeInfo handshakeInfo = session.getHandshakeInfo();
        if (handshakeInfo.getUri().getQuery() == null) {
            return session.close(CloseStatus.REQUIRED_EXTENSION);
        }
        Mono<Void> input = session.receive()
                .doOnNext(message -> {
                    //接收客户端发送的消息
                    final String payloadAsText = message.getPayloadAsText();
                    System.out.println(payloadAsText);
                })
                .concatMap(message -> Mono.empty())
                .then();

        //交给service处理消息
        Mono<Void> output = webPushService.handle(session);

        return Mono.zip(input, output).then();
    }
}

配置类给websocket配置uri

@Configuration
public class WebSocketConfiguration {
    /**
     * 使用 map 指定 WebSocket 协议的路由,路由为 ws://localhost:8080/websocket
     */
    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final CustomWebsocketHandler websocketHandler) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket", websocketHandler);
        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }

    /**
     * WebSocketHandlerAdapter 负责将 EchoHandler 处理类适配到 WebFlux 容器中
     *
     * @return WebSocketHandlerAdapter
     */
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

}

service保存上下文并发送消息

@Service
@Slf4j
public class WebPushServiceImpl {
    //缓存需要发送给client的消息
    private Map<String, List<String>> msgStorage = new ConcurrentHashMap<>();


    private AtomicInteger count = new AtomicInteger(0);

    public void sendMessage(String token, String message) {
        final List<String> list = msgStorage.get(token);
        if (list != null) {
            list.add(message);
        }
    }


    public Mono<Void> handle(WebSocketSession session) {

        final String query = session.getHandshakeInfo().getUri().getQuery();
        msgStorage.put(query, new CopyOnWriteArrayList<>());

        final Flux<WebSocketMessage> generate = Flux.generate(sink -> this.process(session, sink));

        return session.send(Flux.interval(Duration.ofMillis(20))
                .zipWith(generate,
                        (time, event) -> event));

    }

    private void process(WebSocketSession session, SynchronousSink<WebSocketMessage> sink) {
        final String query = session.getHandshakeInfo().getUri().getQuery();
        List<String> list = msgStorage.get(query);
        if (CollectionUtils.isEmpty(list)) {
            final WebSocketMessage webSocketMessage = session.pongMessage(dataBufferFactory ->
                    dataBufferFactory.wrap(("hello" + count.incrementAndGet()).getBytes()));
            sink.next(webSocketMessage);
            return;
        }
        final String msg = list.get(list.size() - 1);
        list.remove(msg);
        sink.next(session.textMessage(msg));
        //
        //不能使用下面方法,用了会断开连接
        //sink.complete();
    }

}

测试一下

浏览器测试websocket接口

//如果有ssl证书改为wss
var  wsServer = 'ws://localhost:8080/websocket?token=12345'; 
var  websocket = new WebSocket(wsServer); 
websocket.onopen = function (evt) { onOpen(evt) }; 
websocket.onclose = function (evt) { onClose(evt) }; 
websocket.onmessage = function (evt) { onMessage(evt) }; 
websocket.onerror = function (evt) { onError(evt) }; 
function onOpen(evt) { 
     console.log("Connected to WebSocket server."); 
} 
function onClose(evt) { 
     console.log("Disconnected"); 
} 
function onMessage(evt) { 
     console.log('Retrieved data from server: ' + evt.data); 
} 
function onError(evt) { 
     console.log('Error occured: ' + evt.data); 
}

直接在浏览器里面运行测试,连接成功了,哈哈

看看后台日志,前端发送的test接收到了

再做一个后端测试接口给浏览器发消息

@RestController
@RequestMapping("/test")
public class MessageTest {

    @Autowired
    private WebPushServiceImpl webPushService;

    @GetMapping("/send")
    public Mono<String> sendMessage() {
        webPushService.sendMessage("token=12345", "push test");
        return Mono.just("success");
    }
}

postMan或者其他工具发个测试请求试试,我直接用curl命令行

再看看浏览器,成功收到”push test”

demo的github地址 https://github.com/jgaybjone/websocket-demo

发表评论

电子邮件地址不会被公开。 必填项已用*标注