原创

当服务器需要实时向客户端推送数据怎么办?


什么是Server-Sent Events

在Web开发领域,SSE是一种让服务器能够实时向客户端推送更新的技术,它是HTML5规范的一部分。通过创建一个持久化的HTTP连接,服务器可以持续不断地将数据以text/event-stream格式推送给浏览器端的JavaScript应用程序。客户端打开一个到服务器的SSE连接后,服务器在有新事件发生时,会在该连接上发送事件数据,而客户端则可以通过监听这些事件来实时更新用户界面或执行其他操作。相比轮询或其他长轮询机制,SSE能更有效地利用网络资源,并且实现更简单,适用于实时更新但不要求双向通信的场景。我们通常称这种方案为“服务端消息推送轻量化方案”。

Server-Sent Events具体要怎么实现,以springboot为例

以Spring Boot为例,实现Server-Sent Events(SSE)服务器推送技术的步骤如下:

服务器端(Spring Boot)实现:

  1. 创建Controller方法: 在Spring Boot中,通过一个特殊的@RestController方法来处理SSE请求。这个方法会保持HTTP连接打开,并持续向客户端发送数据
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class SseController {

    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> sseStream() {
        return Flux.interval(Duration.ofSeconds(1)) // 每隔一秒生成一个新的事件
                .map(sequence -> "data: {" + System.currentTimeMillis() + "}\n\n"); // 生成包含时间戳的数据字符串
    }
}

上述代码中的Flux.interval()用于产生一个按固定间隔推送事件的流。在每个事件中,我们构造了一个简单的JSON格式的时间戳消息体。注意,SSE消息必须遵循一定的格式,即每条消息由"data:"开头,消息内容后面跟有两个换行符来表示一个完整事件的结束。

  1. 前端接收SSE事件: 在前端JavaScript中,可以使用EventSource API来订阅来自服务器的SSE流。
const source = new EventSource('/sse');

source.onmessage = function(event) {
    console.log('Received data:', event.data);
};

source.onerror = function(error) {
    console.error('Error occurred:', error);
};

以上代码会在浏览器中创建一个新的EventSource实例,指向服务器上的SSE资源地址。每当服务器有新事件到达时,onmessage回调函数会被调用,并且可以通过event.data访问到事件的内容。 这样就完成了基于Spring Boot的SSE功能的基本实现,可以根据实际需求调整推送的内容和频率。

Flux.interval()的原理

Flux.interval() 是在Reactor框架中用于生成定期连续事件的函数式API,它属于Project Reactor(基于Java 8的反应式编程库),并广泛应用于Spring WebFlux项目。当调用 Flux.interval(Duration period) 方法时,它会创建一个 Flux 流,该流按指定的时间间隔(以 Duration 表示)持续产生 Long 类型的值。 原理概述:

  1. 定时器驱动: Flux.interval() 使用了一个内部定时器来触发事件。每次时间间隔到达后,定时器会发出信号,然后执行操作来生成新的Long类型的元素,并将该元素推送到订阅者链上。
  2. 背压支持: 生成的Flux遵循 Reactive Streams 规范,这意味着它支持背压。如果下游订阅者处理速度较慢,无法及时消费产生的事件,那么interval流将会自动减缓或暂停事件的生成,以避免内存溢出等错误。
  3. 线程模型: 在Netty或其它异步I/O框架结合使用的场景下,如Spring WebFlux,Flux.interval() 的执行通常与调度器(Schedulers)相结合,以便在非阻塞IO线程之外的其他线程上进行计时和事件发布,从而不影响主I/O线程的工作效率。
  4. 生命周期管理: 订阅Flux.interval()之后,只要订阅关系未被取消,就会一直按照设定的时间间隔产生事件。若要停止产生事件,可以通过取消订阅或者关闭相关资源实现。 总结来说,Flux.interval()提供了一种便捷的方式来创建一个持续、周期性推送事件的响应式数据流,这种设计有利于构建高效、弹性的实时数据推送系统。

(下一篇:聊天室的诞生,WebSocket的前世今身)

java
  • 作者:admin(联系作者)
  • 发表时间:2024-03-07 09:08
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论