# message_bus **Repository Path**: liudegui/message_bus ## Basic Information - **Project Name**: message_bus - **Description**: C++ 消息总线模块,支持消息等待回调和超时响应 - **Primary Language**: C++ - **License**: MIT - **Default Branch**: optimze - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 49 - **Forks**: 24 - **Created**: 2015-06-11 - **Last Updated**: 2026-02-21 ## Categories & Tags **Categories**: utils **Tags**: Cpp, 消息总线, 事件驱动, 订阅发布 ## README # MessageBus A lightweight, header-only C++14 message bus with publish-subscribe pattern and timeout support. ## Features - **Publish-Subscribe**: Subscribe to message IDs and receive callbacks when messages are published - **Timeout Management**: Optional timeout callbacks for subscriptions that don't receive messages within a deadline - **Once Subscribe**: Subscriptions that automatically unsubscribe after receiving the first message - **Thread-Safe**: All public APIs are safe to call from multiple threads concurrently - **Re-entrant Safe**: Callbacks can safely publish new messages without deadlock - **Header-Only**: Single header file, no external dependencies ## Architecture ```mermaid classDiagram class MessageBus { +instance() MessageBus& +publishMessage(messageId, content, additionalData) +subscribeToMessage(item) bool +clearAllSubscriptions() +start() +stop() } class PeriodicTaskScheduler { +startTask(intervalMs, task) +stop() +isStopped() bool } class SubscriptionItem { +messageCallback: function +timeoutCallback: function +timeoutIntervalMilliseconds: int32 +subscribedMessageIds: vector +subscriptionType: SubscriptionType } class SubscriptionType { <> ALWAYS_SUBSCRIBE ONCE_SUBSCRIBE } MessageBus "1" --> "1" PeriodicTaskScheduler : owns MessageBus "1" --> "*" SubscriptionItem : manages SubscriptionItem --> SubscriptionType : uses ``` ## Quick Start ### Build ```bash cmake -B build -DCMAKE_BUILD_TYPE=Release cmake --build build ``` ### Run Tests ```bash ./build/test_app ``` ### Usage ```cpp #include "message_bus.hpp" using namespace MessageBusSystem; int main() { MessageBus& bus = MessageBus::instance(); // Subscribe SubscriptionItem item; item.messageCallback = [](const std::vector& content, std::int32_t data) { // handle message }; item.timeoutCallback = []() { // handle timeout }; item.timeoutIntervalMilliseconds = 1000; item.subscribedMessageIds.push_back(1); bus.subscribeToMessage(item); // Start timeout checker bus.start(); // Publish std::vector payload = {0x01, 0x02, 0x03}; bus.publishMessage(1, payload); bus.stop(); return 0; } ``` ## Thread Safety | Method | Thread Safety | |--------|---------------| | `publishMessage()` | Safe from multiple threads | | `subscribeToMessage()` | Safe from multiple threads | | `clearAllSubscriptions()` | Safe from multiple threads | | `start()` / `stop()` | Call from a single thread | | Callbacks | Invoked outside the lock, safe to call `publishMessage()` from within | ### Design Decisions - **Single mutex**: Both the callback map and timeout list are protected by a single mutex, eliminating all deadlock risks from inconsistent lock ordering. - **Callbacks outside lock**: Message and timeout callbacks are collected under the lock but invoked after the lock is released, preventing re-entrant deadlock when callbacks publish new messages. - **Joinable thread**: The periodic task scheduler uses a joinable thread (not detached), ensuring clean shutdown. ## Sequence Diagram ```mermaid sequenceDiagram participant User participant MessageBus participant Callbacks User->>MessageBus: publishMessage(id, content) activate MessageBus Note over MessageBus: Lock mutex MessageBus->>MessageBus: Remove timeout entries for id MessageBus->>MessageBus: Collect matching callbacks MessageBus->>MessageBus: Unsubscribe ONCE items Note over MessageBus: Unlock mutex MessageBus->>Callbacks: Invoke callbacks (outside lock) deactivate MessageBus ``` ## Requirements - C++14 compiler (GCC 5+, Clang 3.5+, MSVC 2015+) - CMake 3.10+ (for building tests) - POSIX threads (Linux/macOS) or Windows threads ## License MIT License