深入理解 `Sinks.Empty<Void>` 和 `Mono<Void>`:如何触发完成信号并结合 WebSocket 示例

news/2025/2/26 8:40:43

在响应式编程中,Sinks 是 Project Reactor 提供的一个强大工具,用于手动控制数据流的信号发射。Sinks.Empty<Void> 是一种特殊的 Sinks,它不发射任何数据,仅用于表示完成或错误信号。结合 Mono<Void>,它可以用来表示一个异步操作的完成状态。本文将详细分析 Sinks.Empty<Void>Mono<Void> 的行为,并通过示例代码展示其用法,同时结合 WebSocket 建立连接的伪代码,展示其在实际场景中的应用。


1. Sinks.Empty<Void> 是什么?

Sinks.Empty<Void> 是 Project Reactor 中的一个 Sinks 类型,专门用于表示一个不发射任何数据的信号源。它的主要特点是:

  • 不发射任何数据(onNext 信号)。
  • 只能发射完成信号(onComplete)或错误信号(onError)。
  • 需要通过显式调用方法(如 tryEmitEmpty())来触发完成信号。

它的典型使用场景是表示一个异步操作的完成状态,而不需要传递任何数据。


2. Mono<Void> 的作用

Mono<Void> 是 Project Reactor 中的一个响应式类型,表示一个最多发射一个元素的异步序列。对于 Mono<Void> 来说:

  • 它不会发射任何数据(onNext 信号)。
  • 它只会发射完成信号(onComplete)或错误信号(onError)。
  • 它通常用于表示一个不需要返回值的异步操作。

通过将 Sinks.Empty<Void> 转换为 Mono<Void>,我们可以将手动控制的信号源与响应式流结合起来。


3. 如何触发完成信号?

Sinks.Empty<Void> 的完成信号需要通过显式调用 tryEmitEmpty() 来触发。以下是关键点:

  • 在调用 tryEmitEmpty() 之前,Sinks.Empty<Void> 处于未完成状态,订阅者会一直等待。
  • 调用 tryEmitEmpty() 后,Sinks.Empty<Void> 会立即发出完成信号,订阅者会收到 onComplete 通知。

4. 示例代码

以下是一个完整的示例,展示了如何使用 Sinks.Empty<Void>Mono<Void> 来触发完成信号:

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class SinksEmptyExample {
    public static void main(String[] args) {
        // 1. 创建一个空的 Sinks.Empty<Void>
        Sinks.Empty<Void> completion = Sinks.empty();

        // 2. 将其转换为 Mono<Void>
        Mono<Void> mono = completion.asMono();

        // 3. 订阅 Mono<Void>
        mono.subscribe(
            null, // onNext (不会调用,因为没有数据)
            error -> System.err.println("Error: " + error), // onError
            () -> System.out.println("Completed!") // onComplete
        );

        // 4. 模拟一个异步操作
        System.out.println("Starting async operation...");
        try {
            Thread.sleep(2000); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 5. 手动触发完成信号
        System.out.println("Triggering completion...");
        completion.tryEmitEmpty();

        // 输出:
        // Starting async operation...
        // (等待 2 秒)
        // Triggering completion...
        // Completed!
    }
}
代码解析:
  1. 创建 Sinks.Empty<Void>

    • 使用 Sinks.empty() 创建一个空的 Sinks.Empty<Void>
  2. 转换为 Mono<Void>

    • 通过 completion.asMono()Sinks.Empty<Void> 转换为 Mono<Void>
  3. 订阅 Mono<Void>

    • 订阅 Mono<Void>,并定义 onErroronComplete 的处理逻辑。
  4. 模拟异步操作

    • 使用 Thread.sleep(2000) 模拟一个耗时 2 秒的异步操作。
  5. 触发完成信号

    • 调用 completion.tryEmitEmpty() 手动触发完成信号,订阅者会收到 onComplete 通知。

5. 结合 WebSocket 建立连接的伪代码

在实际应用中,Sinks.Empty<Void>Mono<Void> 可以用于表示 WebSocket 连接的建立和关闭。以下是一个伪代码示例,展示如何在 WebSocket 连接建立后触发完成信号:

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.web.reactive.socket.WebSocketSession;

public class WebSocketExample {

    public Mono<Void> handleWebSocketConnection(WebSocketSession session) {
        // 1. 创建一个空的 Sinks.Empty<Void>
        Sinks.Empty<Void> completion = Sinks.empty();

        // 2. 将其转换为 Mono<Void>
        Mono<Void> mono = completion.asMono();

        // 3. 处理 WebSocket 连接
        session.receive()
            .doOnNext(message -> {
                // 处理收到的消息
                System.out.println("Received message: " + message.getPayloadAsText());
            })
            .doOnError(error -> {
                // 处理错误
                System.err.println("WebSocket error: " + error);
                completion.tryEmitError(error); // 触发错误信号
            })
            .doOnComplete(() -> {
                // 连接关闭时触发完成信号
                System.out.println("WebSocket connection closed.");
                completion.tryEmitEmpty(); // 触发完成信号
            })
            .subscribe();

        // 4. 返回 Mono<Void>,表示 WebSocket 连接的处理结果
        return mono;
    }

    public static void main(String[] args) {
        WebSocketExample example = new WebSocketExample();

        // 模拟 WebSocket 连接
        WebSocketSession session = // 获取 WebSocketSession 的伪代码
        example.handleWebSocketConnection(session).subscribe(
            null, // onNext (不会调用,因为没有数据)
            error -> System.err.println("Error: " + error), // onError
            () -> System.out.println("WebSocket handling completed!") // onComplete
        );
    }
}
伪代码解析:
  1. 创建 Sinks.Empty<Void>

    • 使用 Sinks.empty() 创建一个空的 Sinks.Empty<Void>,用于表示 WebSocket 连接的完成状态。
  2. 处理 WebSocket 连接

    • 使用 session.receive() 处理 WebSocket 消息。
    • doOnNext 中处理收到的消息。
    • doOnError 中处理错误,并调用 completion.tryEmitError(error) 触发错误信号。
    • doOnComplete 中处理连接关闭,并调用 completion.tryEmitEmpty() 触发完成信号。
  3. 返回 Mono<Void>

    • 返回 Mono<Void>,表示 WebSocket 连接的处理结果。
  4. 订阅 Mono<Void>

    • 订阅 Mono<Void>,并定义 onErroronComplete 的处理逻辑。

6. 关键点总结

  • Sinks.Empty<Void> 是一个手动控制的信号源,它不会自动发出完成信号,必须通过显式调用 tryEmitEmpty() 来触发。
  • Mono<Void> 表示一个不发射数据的异步序列,它只会发出完成或错误信号。
  • WebSocket 连接示例
    • 通过 Sinks.Empty<Void>Mono<Void>,可以灵活地表示 WebSocket 连接的完成状态。
    • 在连接关闭或发生错误时,手动触发完成或错误信号。

7. 总结

通过 Sinks.Empty<Void>Mono<Void>,我们可以灵活地控制异步操作的完成信号。结合 WebSocket 示例,展示了如何在实际场景中使用这些工具。关键在于理解 Sinks.Empty<Void> 的初始状态是未完成的,必须通过显式调用 tryEmitEmpty()tryEmitError() 来触发信号。


http://www.niftyadmin.cn/n/5868404.html

相关文章

网络基础知识-2

N个节点完全互联的网型网即N个节点的无向完全图&#xff0c;无向完全图的边数计算如下&#xff1a;每个节点都要指向其他N-1个节点&#xff0c;但是因为无向两个节点之间的边会重复&#xff0c;因此有N(N-1)/2条边HDLC&#xff08;高级数据链路控制协议&#xff09;是一种面向比…

二叉树-左叶子之和

代码随想录-刷题笔记 404. 左叶子之和 - 力扣&#xff08;LeetCode&#xff09; 内容&#xff1a; 该题仅作为搜索&#xff0c;但是其中的规则让人摸不着头脑&#xff0c;看起来似乎很头疼 但是仔细一思考&#xff0c;能发现左叶子无非是这样的定义 当发现一个节点的 左孩…

单片机裸机编程:状态机与其他高效编程框架

在单片机裸机编程中&#xff0c;状态机是一种非常强大的工具&#xff0c;能够有效管理复杂的逻辑和任务切换。除了状态机&#xff0c;还有其他几种编程模式可以在不使用 RTOS 的情况下实现高效的程序设计。以下是一些常见的方法&#xff1a; 1. 状态机编程 状态机通过定义系统…

C语言 —— 此去经年 应是良辰好景虚设 - 函数

目录 1. 函数的概念 1.1 库函数 1.2 自定义函数 2. 形参和实参 3. return 语句 4. 数组做函数参数 5. 嵌套调用和链式访问 5.1 嵌套调用 5.2 链式访问 6. 函数的声明和定义 6.1 单个文件 6.2 多个文件 7. static 和 extern 7.1 static 修饰局部变量 7.2 static 修…

Java GC 基础知识快速回顾

目录 一、Java 垃圾回收&#xff08;GC&#xff09;基本概念和重要性分析 &#xff08;一&#xff09; Java 垃圾回收&#xff08;GC&#xff09;基本概念回顾 1.GC 三种常见语义 2.Mutator&#xff1a;应用程序的内存管理角色 3.TLAB&#xff08;线程本地分配缓存&#x…

数据结构与算法面试专题——桶排序

引入 桶排序&#xff0c;顾名思义&#xff0c;会用到“桶”&#xff0c;核心思想是将要排序的数据分到几个有序的桶里&#xff0c;每个桶里的数据再单独进行排序。桶内排完序之后&#xff0c;再把每个桶里的数据按照顺序依次取出&#xff0c;组成的序列就是有序的了。 桶排序…

关于网关和ip地址怎么理解?

互联网各领域资料分享专区(不定期更新): Sheet 正文 网关和IP地址是计算机网络中的两个核心概念,它们共同协作实现设备之间的通信。以下是通俗易懂的解释: 1. IP地址(Internet Protocol Address) 作用: IP地址是网络中设备的“唯一标识符”,类似于现实中的门牌号。它…

[高等数学] 有理函数的积分

一、知识点 两个多项式的商 P ( x ) Q ( x ) \frac{P(x)}{Q(x)} Q(x)P(x)​ 称为有理函数&#xff0c;又称有理分式。 当分子多项式 P ( x ) P(x) P(x) 的次数小于分母多项式 Q ( x ) Q(x) Q(x) 的次数时&#xff0c;称这有理函数为真分式&#xff0c;否则称为假分式。 对…