ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ZeroMQ使用教程

2021-03-05 15:57:56  阅读:590  来源: 互联网

标签:教程 zmq socket ZMQ context 使用 ZeroMQ include 客户端


介:

ZeroMQ简称ZMQ,它对socket编程进行了封装,通俗的说,它就像一个框架一样,对socket lib进行了很好的封装,让socket编程变得更加简单。

ZMQ采用消息队列的方式对socket的包进行管理,它支持多线程,同时ZMQ是开源的,你可以在不同的Linux内核架构上缩减ZMQ模块,这在ARM嵌入式上非常有利。

ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”,目前还未成功。

ZMQ不是消息队列服务器,是一个基于消息队列模式的Socket库。

 

ZMQ的特点:

socket是1:1,而ZMQ对socket封装了以后具有n:1的特点,所谓的n:1就是无数客户端可以同时连接一个服务器,你无需去做线程服务分发这样的工作,因为ZMQ都已经为你实现好了。

同时socket的建立与通讯较为复杂,用户需要选择协议,端口,通讯类型、AF_INET、IPv4或者6等等,同时用户还需要考虑发送数据与接收数据的缓冲区大小,包括需要自己管理套接字,而ZMQ屏蔽了这些细节,让用户写少量的代码就能实现一个基本的服务器与客户端程序,同时它支持大文件传输,使得用户不用去关心很多细节,同时也降低了网络开发门槛,会让许多人可能都不太了解BSD的套接字协议。

所以不建议新手采用ZMQ,如果你是学习者则建议先从socket开始学起。

 

ZMQ的三种模式:

1.应答模式

应答模式在ZMQ里是REQ和REP

服务端是REP

客户端是REQ

典型的一问一答协议,即客户端需要首先发送hello,服务器则返回word,若客户端发送hello,服务器没有应答,后续通讯将不成立。

如:

客户端首先对服务端发送了hello,那么客户端会等待服务端应答,若在此期间客户端再次向服务端发送消息,服务端是收不到的,客户端有一个消息队列,会放入消息队列,只有在 客户端收到服务端的回应之后才会去依次处理消息队列里的内容。

2.订阅模式

即PUB/SUB

PUB代表服务器,SUB代表客户端

这种服务即服务器会不停发送数据,然后客户端对其进行订阅,客户端会收到服务器发送的数据,且不需要做出应答,客户端也不需要发送打招呼消息,只需要连接上就会收到服务器的订阅消息。

同时服务器不具有收客户端发送消息的能力。

这是单向的,即服务器只能发,客户端只能收,可以同时多个客户端订阅一个服务器。

3.分布式模式

PUSH/PULL

这种协议即服务器收到消息会立马推送给连接的客户端。

如一个使用PUSH协议的服务器,然后有四个PULL客户端连接上去。

如果我想同时发送消息给其它四个客户端,我不需要一个一个发,只需要给服务器发一个消息,服务器会自动推送给其它四个。

 

安装教程

1.Linux

去官方下载合适的版本:http://download.zeromq.org/ 下载完成后解压至任意目录并进入到此目录下

执行configure,prefix参数为安装目录

若出现缺失ibsodium,加上--without-libsodium参数

./configure --prefix=/usr/local/zeromq

编译与安装

make
make install

安装完成之后,安装目录下会有include和lib,一个是头文件目录一个是库文件目录。

目前发行版仓库已经自带了ZMQ

各大发行版安装方式:

Fedora

dnf install zeromq-devel

Ubuntu/Debian/Mint

apt-get install libzmq3-dev

Arch

pacman -S zeromq

SUSE

zypper install zeromq-devel

2.Windows

这里下载安装包https://zeromq.org/download/

解压,并打开.sln文件,编译生成后会在当前目录下生成include和lib文件,把include和lib添加到你的项目工程里就可以使用了。

 

使用教程

1.应答模式REQ/REP

客户端:

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main(void)
{
    printf("Connecting to server...\n");

    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_REQ);
    zmq_connect(socket, "tcp://localhost:6666");

    while(1)
    {
        char buffer[10];
        const char * requestMsg = "Hello";
        int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0);
        printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg);

        bytes = zmq_recv(socket, buffer, 10, 0);
        buffer[bytes] = '\0';
        printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer);

    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

服务器:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_REP);
    zmq_bind(socket, "tcp://*:6666");

    while(1)
    {
        char buffer[10];
        int bytes = zmq_recv(socket, buffer, 10, 0);
        buffer[bytes] = '\0';
        printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer);

        sleep(1);

        const char * replyMsg = "World";
        bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

2.订阅模式PUB/SUB

服务器

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>


int main()
{
    printf("Hello world!\n");

    void* context = zmq_ctx_new();
    assert(context != NULL);

    int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);

    assert(ret == 0);

    void* publisher = zmq_socket(context, ZMQ_PUB);
    assert(publisher != NULL);

    ret = zmq_bind(publisher, "tcp://192.168.1.5:8888");

    assert(ret == 0);

    while(1)
    {
        ret = zmq_send(publisher, "Hi,I'm server", 16, 0);

        assert(ret == 7);

        printf("%d\n", ret);

        sleep(1);

    }

    printf("1\n");

    return 0;
}

客户端

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <zmq.h>

int main()
{
    printf("Hello world!\n");

    void* context = zmq_ctx_new();
    assert(context != NULL);

    int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
    assert(ret == 0);

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    assert(subscriber != NULL);

    ret = zmq_connect(subscriber, "tcp://192.168.1.5:8888");
    assert(ret == 0);

    ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);

    assert(ret == 0);

    char buf[16];
    while(1)
    {
        ret = zmq_recv(subscriber, buf, 16, ZMQ_DONTWAIT);
        if (ret != -1)
        {
            buf[ret] = '\0';
            printf("%s\n", buf);
        }
        sleep(1);
    }



    return 0;
}

3.分布式推送PUSH/PULL

分发者 ventilator
执行者 worker
收集结果的接收者 sink

ventilator:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
 
int main(void)
{
    void * context = zmq_ctx_new();
    void * sender = zmq_socket(context, ZMQ_PUSH);
    zmq_bind(sender, "tcp://*:6666");
	printf ("Press Enter when the workers are ready: ");
    getchar ();
	printf ("Sending tasks to workers...\n");
    while(1)
    { 
        const char * replyMsg = "World";
        zmq_send(sender, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
    }
 
    zmq_close(sender);
    zmq_ctx_destroy(context);
 
    return 0;
}

worker:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main(void)
{
void * context = zmq_ctx_new();
void * recviver = zmq_socket(context, ZMQ_PULL);
zmq_connect(recviver, "tcp://localhost:6666");

void * sender = zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5555");

while(1)
{
char buffer [256];
int size = zmq_recv (recviver, buffer, 255, 0);
if(size < 0)
{
return -1;
}
printf("buffer:%s\n",buffer);
const char * replyMsg = "World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}

zmq_close(recviver);
zmq_close(sender);
zmq_ctx_destroy(context);

return 0;
}

sink:

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
 
int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_PULL);
    zmq_bind(socket, "tcp://*:5555");
 
    while(1)
    { 
       	char buffer [256];
		int size = zmq_recv (socket, buffer, 255, 0);
		if(size < 0)
		{
			return -1;
		}
        printf("buffer:%s\n",buffer);
    }
 
    zmq_close(socket);
    zmq_ctx_destroy(context);
 
    return 0;
}

流程图:

由ventilator通知work分发。

results连接到work,work连接至ventilator,当有需要推送的消息时,由ventilator推送给work,work在推送给其它客户端。

标签:教程,zmq,socket,ZMQ,context,使用,ZeroMQ,include,客户端
来源: https://blog.csdn.net/bjbz_cxy/article/details/114384483

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有