# publish_subscribe **Repository Path**: Jumping99/publish_subscribe ## Basic Information - **Project Name**: publish_subscribe - **Description**: 一个通用的发布-订阅-消息总线框架,用于解耦之间有动作依赖的模块。 - **Primary Language**: C - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-06-10 - **Last Updated**: 2026-06-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # publish_subscribe(发布-订阅) #### 介绍 纯C语言实现的一个可裁剪的、通用的**发布-订阅-消息总线**框架,用于解耦之间有动作依赖的模块。除了全局的消息总线和发布者-订阅者的数据耦合外,不会有其他依赖关系,大大降低耦合度,**全局变量杀手**。具有同步模式和异步模式两种模式。 可选**静态内存策略API**和**动态内存策略API**,以便于适配不同的内存管理策略。对接操作系统部分使用**操作系统抽象层(OSAL)**, 以便于适配不同的操作系统。 #### 同步模式(sync/publish_subscribe_sync.c) 1. 在同步模式下,消息总线会**阻塞**在消息处理函数中,直到所有消息都被处理完毕。 2. 在同步模式下,若消息带有数据@arg,则可以使用局部变量,不需要担心数据的生命周期问题。 3. 在同步模式下,若订阅者处理函数执行时间过长,会阻塞其他消息的处理和当前程序流程。 4. 同步模式不需要操作系统的支持。但如果需要在多线程环境下使用,仍然需要适配操作系统抽象层,提供互斥锁以确保线程同步。 5. 同步模式下可以做到**完全静态内存管理**,不需要运行中malloc动态内存分配,非常适合在**裸机环境**中使用。 #### 异步模式(async/publish_subscribe_async.c) 1. 异步模式下,消息发布后会立刻返回,**不会阻塞**发布线程,除非负责存储消息的队列已满。 2. 异步模式下,消息处理会在独立的后台线程中进行,订阅者的回调函数需要注意线程安全问题。 3. 异步模式下,所有API都是线程安全的。 4. 异步模式需要**操作系统的支持**,如线程、队列等。 **说明**: 同步模式和异步模式的启用配置宏分别为`PS_USE_SYNC`和`PS_USE_ASYNC`, 且需要根据模式不同编译不同的源文件。 #### 同步模式结构图 ```mermaid graph TB MB[message_bus_t] TB0[topic: id=0] TB1[topic: id=1] TB2[topic: id=N] SUB0_0[subscriber_t] SUB0_1[subscriber_t] SUB1_0[subscriber_t] SUB1_1[subscriber_t] SUB1_2[subscriber_t] CB0_0[(callback)] CB0_1[(callback)] CB1_0[(callback)] CB1_1[(callback)] CB1_2[(callback)] MB --> TB0 MB --> TB1 MB --> TB2 TB0 --> SUB0_0 TB0 --> SUB0_1 TB1 --> SUB1_0 TB1 --> SUB1_1 TB1 --> SUB1_2 SUB0_0 --> CB0_0 SUB0_1 --> CB0_1 SUB1_0 --> CB1_0 SUB1_1 --> CB1_1 SUB1_2 --> CB1_2 ``` **说明**:同步模式下,发布消息时直接遍历主题列表,找到匹配的主题后立即调用所有订阅者的回调函数,阻塞直到所有回调执行完成。 #### 异步模式结构图 ```mermaid graph TB MB[message_bus_t] Q[osal_queue] T[osal_thread] M[osal_mutex] TB0[topic: id=0] TB1[topic: id=1] SUB0_0[subscriber_t] SUB0_1[subscriber_t] SUB1_0[subscriber_t] SUB1_1[subscriber_t] CB0_0[(callback)] CB0_1[(callback)] CB1_0[(callback)] CB1_1[(callback)] PUB[Publisher] PUB --> |ps_publish_topic| Q Q --> |recv| T T --> M M --> MB MB --> TB0 MB --> TB1 TB0 --> SUB0_0 TB0 --> SUB0_1 TB1 --> SUB1_0 TB1 --> SUB1_1 SUB0_0 --> CB0_0 SUB0_1 --> CB0_1 SUB1_0 --> CB1_0 SUB1_1 --> CB1_1 ``` **说明**:异步模式下,发布消息时将消息发送到队列后立即返回。后台线程通过 osal_queue_recv 从队列中取出消息,遍历主题列表找到匹配的主题,然后调用所有订阅者的回调函数。 #### 静态内存策略API 1. 静态分配需要调用者在调用API时,手动传递已定义好的内存的指针。 2. 静态分配内存API下,所有内存分配都在编译时完成,不会在运行时动态分配任何内存。 3. 对于订阅者和主题的添加,使用的是已确定大小的对象池,非动态分配内存。 4. 静态分配内存API下,对象池会占用固定内存空间,若内存空间不足,会报错。 #### 动态内存策略API 1. 动态分配内存API下,对于订阅者和主题的添加,使用malloc动态分配内存空间。 2. 动态分配内存API下,需要提供内存申请和释放函数,以及需要注意内存碎片问题。 #### 操作系统抽象层(OSAL) 操作系统抽象层为上层提供了统一的接口,以便于适配不同的操作系统。 1. 提供了线程、队列、互斥锁等操作系统相关的接口。 2. 允许用户自定义实现,以便于适配不同的操作系统。 3. 提供了默认的实现,有POSIX和FreeRTOS。 4. 若需要添加新的操作系统支持,需要实现[osal.h](publish_subscribe/osal.h)中定义的类型和接口。 5. 在`tests`目录下,有对应的测试用例[osal_test](test/osal_test.c),用于验证新的操作系统支持。 6. 选择具体的实现,需要在[CMakeLists.txt](CMakeLists.txt)中配置`PS_OSAL`选项, 如果不使用`cmake`构建,添加对应的文件。之后在[config.h](publish_subscribe/config.h)中启用`PS_USE_OSAL`宏即可。 #### 不同应用场景的配置推荐 | 使用场景 | 模式宏 | 内存分配宏 | OSAL要求 | 编译文件 | 适用场景描述 | |---------|--------|-----------|----------|---------|-------------| | **同步模式 + 静态API** | `PS_SYNC_MODE` | `PS_USE_STATIC_ALLOCATOR` | 可选(单线程) | `sync/publish_subscribe_sync.c` | 资源受限嵌入式系统,主题固定,无需动态内存 | | **同步模式 + 动态API** | `PS_SYNC_MODE` | `PS_USE_DYNAMIC_ALLOCATOR` | 可选 | `sync/publish_subscribe_sync.c` | 需要动态主题管理的同步场景,需注意内存泄漏 | | **异步模式 + 静态API** | `PS_ASYNC_MODE` | `PS_USE_STATIC_ALLOCATOR` | **必须** | `async/publish_subscribe_async.c` | 事件驱动架构,主题固定,后台线程处理 | | **异步模式 + 动态API** | `PS_ASYNC_MODE` | `PS_USE_DYNAMIC_ALLOCATOR` | **必须** | `async/publish_subscribe_async.c` | 复杂异步消息系统,支持运行时主题动态管理 | **配置约束说明:** - 模式宏只能二选一:`PS_SYNC_MODE` 或 `PS_ASYNC_MODE` - 内存分配宏只能二选一:`PS_USE_STATIC_ALLOCATOR` 或 `PS_USE_DYNAMIC_ALLOCATOR` - 异步模式下必须启用 `PS_USE_OSAL`(操作系统抽象层) - 动态内存分配时需配置 `ps_malloc` 和 `ps_free` 函数 #### 项目目录结构 publish-subscribe/ ├── publish_subscribe/ # 核心源码目录 │ ├── async/ # 异步模式实现 │ │ ├── publish_subscribe_async.c │ │ └── type_async.h │ ├── sync/ # 同步模式实现 │ │ ├── publish_subscribe_sync.c │ │ └── type_sync.h │ ├── osal/ # 操作系统抽象层实现 │ │ ├── os-posix.c │ │ └── os-freertos.c │ ├── utils/ # 工具模块 │ │ ├── object_pool.c/h │ │ └── doubly_linked_list.h │ ├── config.h # 配置头文件 │ ├── osal.h # OSAL 接口定义 │ └── publish_subscribe.h # 公共接口 ├── test/ # 测试目录 │ └── osal_test.c # OSAL 测试用例 ├── main.cpp # 示例主程序 ├── CMakeLists.txt # 根目录构建脚本 └── LICENSE # 许可证文件 #### 编译和构建 ```bash # 创建构建目录 mkdir build && cd build # 指定模式和 OSAL cmake -DPS_MODE=SYNC .. # 同步模式(同步模式下可以不使用OSAL) cmake -DPS_MODE=ASYNC -DPS_OSAL=POSIX .. # 异步模式 + POSIX # 编译 make ``` #### 函数说明 详见[publish_subscribe.h](publish_subscribe/publish_subscribe.h)文件。 #### 使用示例 1. 创建消息总线 ```c auto bus = ps_create_message_bus(); // 动态方式创建消息总线 ``` ```c message_bus_t static_bus; subscriber_t subscriber_array[20]; topic_t topic_array[20]; auto bus = ps_create_message_bus_static(&static_bus, subscriber_array, sizeof(subscriber_array), topic_array, sizeof(topic_array)); // 静态方式创建消息总线 ``` 2. 创建主题 主题使用int类型标识,每个主题的id必须是唯一的,由调用者负责定义。 ```c enum topic_id { TEMP_CHANGED = 0, HUMIDITY_CHANGED = 1, }; // 定义主题id枚举 ps_message_bus_create_topic(bus, TEMP_CHANGED); // 创建主题:温度变化 ps_message_bus_create_topic(bus, HUMIDITY_CHANGED); // 创建主题:湿度变化 ``` 3. 创建订阅者 在消息总线中订阅一个主题(事件),返回一个订阅者对象。 ```c // 订阅温度变化主题,回调函数为callback0 subscriber_t *sub0 = ps_message_bus_subscribe(bus, TEMP_CHANGED, callback0); // 订阅湿度变化主题,回调函数为callback1 subscriber_t *sub1 = ps_message_bus_subscribe(bus, HUMIDITY_CHANGED, callback1); ``` 若后续不需要删除该订阅者,那么检查完返回值后,可以不保存订阅者对象。 4. 在需要的时候发布消息 在消息总线中发布一个消息,消息包含主题id和主题数据(如果有)。 ```c // 发布温度变化消息(静态内存策略,同步模式下不会有问题,异步模式下需要注意变量生命周期问题) ps_publish_topic_static(bus, TEMP_CHANGED, &temp_data); // 发布湿度变化消息(动态内存策略,订阅者回调结束后自动释放) ps_publish_topic_dynamic(bus, HUMIDITY_CHANGED, &humidity_data, sizeof(humidity_data)); ``` 若使用同步模式,发布消息后,会立刻调用订阅者回调函数。 若使用异步模式,发布消息后,会将消息加入队列,由后台线程处理。 5. 销毁消息总线 ```c ps_destroy_message_bus(bus); ``` 销毁消息总线通常不是必须的,因为程序需要持续运行,消息总线需要持续存在。 消息总线被销毁时,会自动释放所有订阅者和主题的内存空间。 #### 注意事项 1. **异步模式队列满时**:当消息队列已满(默认64条),`ps_publish_topic_*` 会阻塞直到队列有空闲位置 2. **静态内存模式**:对象池大小在编译时确定,超出限制会返回错误 3. **线程安全**:异步模式下所有API线程安全,同步模式若在多线程环境下,需要启用**OSAL支持**保证线程安全。 4. **回调函数**:回调函数应尽量简短,避免阻塞(尤其在同步模式下)。