进程间通信中间件---ZeroMQ

news/2025/2/23 6:28:08

ZeroMQ(也称为 ØMQ 或 0MQ)是一个高性能的异步消息传递库,专为分布式或并发应用程序设计。它提供了多种通信模式(如请求-响应、发布-订阅等),并且可以在多种传输协议(如 TCP、IPC、PGM 等)上运行。ZeroMQ 的设计理念是“消息队列”,但它并不像传统的消息队列那样需要单独的中间件服务器,而是直接嵌入到应用程序中。

为什么选择ZeroMQ ?

在我使用过程中感受到最直观的一点就是“便捷高效”,我使用过NATS,RabbitMQ,Apache Kafka等中间件都需要安装一系列服务器,还需要配置服务器等等,很是麻烦,并且也会给系统带来一些负担,而zeroMQ只需要下载一个zeroMQ的包就可以直接在程序中调用  #include <zmq.h>  就可以实现编辑了,并且拥有多语言支持,不只是支持c语言,还支持python,java,GO,C++等语言,zeroMQ还有轻量级的特点,可以直接嵌入到应用程序中,而有的中间件需要java环境,有的需要很多包才可以运行,然而这对于内存敏感的嵌入式系统是致命缺点!所以这是我选择zeroMQ中间件的原因。

ZeroMQ通信模式

无论选择什么通信模式,可供选择的通信协议有:TCP , IPC , PGM(基于IP的标准组播) , EPGM(基于UDP的组播) , In-Process(线程间通信) , TIPC(集群通信)

测试用Makefile

首先给出一个Makefile编译目录下所有的.c文件,使用方法:

make zmq

 Makefile

# 指定编译器
CC = gcc

# 默认编译选项
CFLAGS = -Wall -g

# 获取当前目录下所有的 .c 文件并生成目标文件名
SRCS := $(wildcard *.c)
OBJS := $(SRCS:.c=)

# 默认目标:编译所有程序
all: $(OBJS)

# 判断是否启用了 zmq 目标
ifeq ($(findstring zmq,$(MAKECMDGOALS)),zmq)
    # 如果启用了 zmq 目标,添加 ZeroMQ 相关的编译和链接选项
    CFLAGS += -DUSE_ZMQ
    LDFLAGS += -lzmq
endif

# 编译规则:将每个 .c 文件编译为同名的可执行文件
%: %.c
	$(CC) $(CFLAGS) $< $(LDFLAGS) -o $@

# 让 zmq 目标依赖于所有可执行文件
zmq: $(OBJS)

# 清理生成的可执行文件
clean:
	rm -f $(OBJS)

.PHONY: all clean zmq

请求-响应(Request-Reply)

双向通信:客户端发送请求,服务器返回响应

client.c

#include <stdio.h>
#include <zmq.h>
#include <string.h>
#include <unistd.h>

int main(void) {
    // 初始化 ZeroMQ 上下文
    void *context = zmq_ctx_new();

    // 创建一个 REQ 类型的套接字
    void *requester = zmq_socket(context, ZMQ_REQ);
    zmq_connect(requester, "ipc://test_ipc");

 int count = 0;
    while (1) {
        char buffer[256];
        snprintf(buffer, sizeof(buffer), "Task %d", count++);
        printf("Sending task: %s\n", buffer);

        // 发送任务
        zmq_send(requester, buffer, strlen(buffer), 0);
        // 接收回复
        //char buffer[1024] = {0};
        zmq_recv(requester, buffer, sizeof(buffer) - 1, 0);
        printf("Received reply: %s\n", buffer);
        sleep(1); // 每秒发送一次任务
    }
    // 清理资源
    zmq_close(requester);
    zmq_ctx_destroy(context);

    return 0;
}

 server.c

#include <stdio.h>
#include <zmq.h>
#include <string.h>

int main(void) {
    // 初始化 ZeroMQ 上下文
    void *context = zmq_ctx_new();

    // 创建一个 REP 类型的套接字
    void *responder = zmq_socket(context, ZMQ_REP);
    int bind_result = zmq_bind(responder, "ipc://test_ipc");
    if (bind_result != 0) {
        printf("Failed to bind socket\n");
        return -1;
    }
    
    while (1) {
        // 接收消息
        char buffer[1024] = {0};
        zmq_recv(responder, buffer, sizeof(buffer) - 1, 0);
        printf("Received: %s\n", buffer);
        // 发送回复
        const char *response = "World";
        zmq_send(responder, response, strlen(response), 0);
    }

    // 清理资源
    zmq_close(responder);
    zmq_ctx_destroy(context);

    return 0;
}

发布-订阅(Publish-Subscribe)

广播通信:发布者广播消息,所有感兴趣的订阅者接收消息

publisher.c

#include <stdio.h>
#include <zmq.h>
#include <unistd.h>
#include <string.h>

int main(void) {
    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    zmq_bind(publisher, "tcp://*:5556");

    int count = 0;
    while (1) {
        char buffer[256];
        snprintf(buffer, sizeof(buffer), "Message %d", count++);
        printf("Publishing: %s\n", buffer);

        // 发送消息
        zmq_send(publisher, buffer, strlen(buffer), 0);
        sleep(1); // 每秒发送一次消息
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);

    return 0;
}

subscriber.c

#include <stdio.h>
#include <zmq.h>

int main(void) {
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    zmq_connect(subscriber, "tcp://localhost:5556");

    // 设置订阅过滤器(空字符串表示接收所有消息)
    zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);

    while (1) {
        char buffer[256] = {0};
        zmq_recv(subscriber, buffer, sizeof(buffer) - 1, 0);
        printf("Received: %s\n", buffer);
    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return 0;
}

 消息过滤 (  指定订阅 )

在PUB-SUB模式下有个特点是可以指定订阅,达到信息分类接收的目的,关键在于main函数运行一次的这句话

zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_);
不指定消息类型:
// 设置订阅过滤器(空字符串表示接收所有消息)
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
指定消息类型:
//指定接收话题A的信息
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "TopicA.", 7); // 只接收以 "TopicA." 开头的消息

那么现在订阅者只接收话题A,所以我发布端也需要修改发布的内容:

 main函数如下

#include <stdio.h>
#include <zmq.h>
#include <unistd.h>
#include <string.h>

void *publisher = NULL;
int count = 0;

void TopicA_Send(void){
  char endbuffer[256];
   // 发送话题A消息
    snprintf(endbuffer,sizeof(endbuffer), "TopicA.Message %d", count);
    zmq_send(publisher, endbuffer, strlen(endbuffer), 0);
    printf("AAAAPublishing: %s\n", endbuffer);
}

void TopicB_Send(void){
  char endbuffer[256];
   // 发送话题B消息
    snprintf(endbuffer,sizeof(endbuffer), "TopicB.Message %d", count);
    zmq_send(publisher, endbuffer, strlen(endbuffer), 0);
    printf("BBBBPublishing: %s\n", endbuffer);
}

int main(void) {
    void *context = zmq_ctx_new();
    publisher = zmq_socket(context, ZMQ_PUB);
    zmq_bind(publisher, "ipc://ipc_test");
    while (1) {
        TopicB_Send();
        sleep(1); // 每秒发送一次消息
        TopicA_Send();
        sleep(1); // 每秒发送一次消息
        count++;
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);

    return 0;
}

 关键点在于,在发送的时候在前面加入了 TopicB. / TopicA. 达到分话题发布的效果:

 snprintf(endbuffer,sizeof(endbuffer), "TopicA.Message %d", count);
 zmq_send(publisher, endbuffer, strlen(endbuffer), 0);

再次复制一个subscriber.c文件重命名后面加一个B,只修改这一句话

 zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "TopicB.", 7); // 只接收以 "TopicB." 开头的消息

 就可以达到分开接收话题B的效果

实际运行效果

实际运行起来是:

 可以看到订阅者接收了自己对应的话题并且打印出来接收的信息,原理是根据前几个字符过滤信息。

推送-拉取(Push-Pull)

单向通信:pusher推送任务,puller拉取任务

puller.c

#include <stdio.h>
#include <zmq.h>

int main(void) {
    void *context = zmq_ctx_new();
    void *puller = zmq_socket(context, ZMQ_PULL);
    zmq_bind(puller, "tcp://*:5557");

    while (1) {
        char buffer[256] = {0};
        zmq_recv(puller, buffer, sizeof(buffer) - 1, 0);
        printf("Received task: %s\n", buffer);
    }

    zmq_close(puller);
    zmq_ctx_destroy(context);

    return 0;
}

pusher.c

#include <stdio.h>
#include <zmq.h>
#include <unistd.h>
#include <string.h>

int main(void) {
    
    void *context = zmq_ctx_new();

    void *pusher = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(pusher, "tcp://localhost:5557");

    int count = 0;
    while (1) {
        char buffer[256];
        snprintf(buffer, sizeof(buffer), "Task %d", count++);
        printf("Sending task: %s\n", buffer);

        // 发送任务
        zmq_send(pusher, buffer, strlen(buffer), 0);
        sleep(1); // 每秒发送一次任务
    }

    zmq_close(pusher);
    zmq_ctx_destroy(context);

    return 0;
}

思考

Publish-Subscribe 还是 Push-Pull ?
  • 在上面的 发布-订阅 推送-拉取 看起来比较相似,都是一收一发,单向通信,只是发布-订阅模式可以过滤话题,但是我pull的时候也可以通过软件过滤前几行字,那为什么还要这个发布-订阅模式呢?

他们有以下区别:

假设我们有两个消费者(Consumer),一个生产者(Producer)。

(1) Push-Pull 模式
  • 生产者生成 6 条消息:Task 1, Task 2, Task 3, Task 4, Task 5, Task 6
  • 如果有两个 Pull 端,ZeroMQ 会按以下顺序分发消息:
    • Pull 端 1 接收:Task 1, Task 3, Task 5
    • Pull 端 2 接收:Task 2, Task 4, Task 6
  • 所以在一个push的时候多个pull的情况下信息会逐条均衡的发给各个接收端达到均衡负载的目的!,相当于一对一的发送!
(2) Publish-Subscribe 模式
  • 生产者生成 6 条消息:Topic1 Task 1, Topic1 Task 2, Topic2 Task 3, Topic1 Task 4, Topic2 Task 5, Topic1 Task 6
  • 如果有两个订阅者,并且订阅者 1 订阅了 Topic1,订阅者 2 订阅了 Topic2,那么:
    • 订阅者 1 接收:Topic1 Task 1, Topic1 Task 2, Topic1 Task 4, Topic1 Task 6
    • 订阅者 2 接收:Topic2 Task 3, Topic2 Task 5

Push-Pull 模式 是一种逐条分发消息的模式,适合任务分发和负载均衡。

Publish-Subscribe 模式 是一种广播分发消息的模式,适合实时数据广播和事件通知。

例子

一个pusher端 两个puller 端

// pusher.c
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

int main(void) {
    void *context = zmq_ctx_new(); // 创建 ZeroMQ 上下文
    void *pusher = zmq_socket(context, ZMQ_PUSH); // 创建 PUSH 套接字
    zmq_bind(pusher, "tcp://*:5557"); // 绑定地址

    printf("Pusher started. Sending tasks...\n");

    for (int i = 1; i <= 10; i++) {
        char buffer[256];
        snprintf(buffer, sizeof(buffer), "Task %d", i); // 构造任务消息
        printf("Sending task: %s\n", buffer);
        zmq_send(pusher, buffer, strlen(buffer), 0); // 发送任务
        sleep(1); // 模拟任务生成间隔
    }

    zmq_close(pusher); // 关闭套接字
    zmq_ctx_destroy(context); // 销毁上下文
    return 0;
}
// puller.c
#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
    if (argc != 2) {
        printf("Usage: puller <identity>\n");
        return 1;
    }

    const char *identity = argv[1]; // Pull 端的身份标识(用于区分不同的 Pull 端)
    void *context = zmq_ctx_new(); // 创建 ZeroMQ 上下文
    void *puller = zmq_socket(context, ZMQ_PULL); // 创建 PULL 套接字
    zmq_connect(puller, "tcp://localhost:5557"); // 连接到 Push 端

    printf("Puller %s started. Waiting for tasks...\n", identity);

    while (1) {
        char buffer[256];
        zmq_recv(puller, buffer, sizeof(buffer), 0); // 接收任务
        printf("Puller %s received task: %s\n", identity, buffer);

        // 模拟处理时间
        sleep(1);
    }

    zmq_close(puller);
}

结果:(注意在 ./puller X)后面更上puller的端点号

1.打开puller 等待接收

2.开始push,看到pull轮流接收信息处理

注意这里的puller分配机制不靠输入的arge[]标识,靠自己的机制

 

总的来说,

push-pull 关键在于分发任务均衡处理,同一文件不同进程运行,减少压力

Publish-Subscribe 关键在于订阅特定消息,实现特定消息特定进程处理


http://www.niftyadmin.cn/n/5863118.html

相关文章

体育数据网站推荐系统开发:赛事数据、前瞻分析与智能推荐

体育数据网站作为集赛事数据、前瞻分析、专家解读于一体的综合平台&#xff0c;其推荐系统的开发需要充分考虑多维度数据的整合与应用。本文将深入探讨如何构建一个智能化的体育数据推荐系统。 一、系统架构设计 数据采集层&#xff1a; 实时赛事数据API接入 专家分析内容抓…

怎麼利用靜態ISP住宅代理在指紋流覽器中管理社媒帳號?

靜態ISP住宅代理是一種基於真實住宅IP的代理服務。這類代理IP通常由互聯網服務提供商&#xff08;ISP&#xff09;分配&#xff0c;具有非常高的真實性&#xff0c;與普通數據中心代理相比&#xff0c;更不容易被平臺檢測到為“虛假IP”或“代理IP”&#xff0c;靜態ISP住宅代理…

【愚公系列】《鸿蒙原生应用开发从零基础到多实战》002-TypeScript 类型系统详解

标题详情作者简介愚公搬代码头衔华为云特约编辑&#xff0c;华为云云享专家&#xff0c;华为开发者专家&#xff0c;华为产品云测专家&#xff0c;CSDN博客专家&#xff0c;CSDN商业化专家&#xff0c;阿里云专家博主&#xff0c;阿里云签约作者&#xff0c;腾讯云优秀博主&…

LeetCode51

LeetCode51 目录 题目描述示例思路分析代码段代码逐行讲解复杂度分析总结的知识点整合总结 题目描述 N 皇后问题&#xff1a;将 n 个皇后放置在 n x n 的棋盘上&#xff0c;使得皇后彼此之间不能相互攻击&#xff08;即任何两个皇后不能在同一行、同一列或同一斜线上&#x…

Docker教程(喂饭级!)

如果你有跨平台开发的需求&#xff0c;或者对每次在新机器上部署项目感到头疼&#xff0c;那么 Docker 是你的理想选择&#xff01;Docker 通过容器化技术将应用程序与其运行环境隔离&#xff0c;实现快速部署和跨平台支持&#xff0c;极大地简化了开发和部署流程。本文详细介绍…

线代[8]|北大丘维声教授《怎样学习线性代数?》(红色字体为博主注释)

文章目录 说明一、线性代数的内容简介二、学习线性代数的用处三、线性代数的特点四、学习线性代数的方法五、更新时间记录 说明 文章中红色字体为博主敲录完丘教授这篇文章后所加&#xff0c;刷到这篇文章的读者在首次阅读应当跳过红色字体&#xff0c;先通读一读文章全文&…

深度学习之特征提取

前言 深度学习就是把输入转换成一个高维的向量&#xff0c;之后利用这个向量去完成分类、回归等任务。 深度学习特征工程知识图谱 1. 特征提取的本质 核心目标&#xff1a;将原始数据→高维语义特征向量 监督驱动&#xff1a;标签决定特征提取方向 典型架构&#xff1a; …

Javascript排序算法(冒泡排序、快速排序、选择排序、堆排序、插入排序、希尔排序)详解

JS 排序算法详解 排序算法是计算机科学中的基础&#xff0c;用于将一组数据按照某种顺序重新排列。JavaScript中常见的排序算法包括冒泡排序、选择排序、插入排序、快速排序、归并排序等。以下是这些算法的详细介绍和代码示例。 冒泡排序&#xff08;Bubble Sort&#xff09;…