ZMQ测试代码样例

ZMQ测试代码样例

背景

有部分网友提到上篇文章ZeroMQ性能分析,没有提供测试代码,想要自己验证测试结果;本篇文章,主要介绍代码层面,如何使用ZeroMQ。测试样例有python版本、C/C++版本。

生产-消费者模型

根据官方介绍,ZeroMQ是支持多语言,也就是说,它支持在多种不同语言的进程之间实现通讯。我们想要验证的场景是,生产者-消费者模型,一个生产者,多个消费者,ZeroMQ是否支持这种模型、数据共享性能如何,这也是常见的编程模型。

这个图的表达有一定的误差,例如生产者是进程,将任务队列替换成ZeroMQ,并且ZeroMQ内部也没有队列,图仅可用于示意。从尽快出验证结果的角度来说,Python语言最简单。

Python验证代码

Python语言客户端代码如下

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")

mark = "hello"
test = str()
for i in range(1024*1024*10):
    test = test + mark
test = test.encode("utf-8")

start = time.time()

while True:
    response = socket.recv()
    #print("response: %s" % response)
    test = "I am test client No.2  recv data " +  str(response)
    #socket.send(test % socket)
    end = time.time()
    print(test)

Python语言服务端代码如下

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")

mark = "hello"
test = str()
for i in range(1024*1024*10):
    test = test + mark
test = test.encode("utf-8")


index = 0
while True:
    test = "Hello I am index " + str(index)
    test = test.encode("utf-8")
    print(test)
    socket.send(test)
    #message = socket.recv()
    print("send test data %",index)
    index += 1
    time.sleep(0.5)
    #socket.send("I am OK!".encode("utf-8"))

这段代码,主要验证ZeroMQ官方提供的分布式用法,一个生产者、多个消费者,竞争消费生产者发送的消息,毫无疑问使用起来没啥问题。只不过,在使用时要安装pip install zmq;要使用python3执行。

在这种场景下,用ptyhon 在同机上使用ZMQ共享10MB数据,耗时达到15ms以上,耗时比较高

C/C++验证代码

根据以往的经验,C语言的性能会比python好很多,根据官方资料继续使用C语言验证ZeroMQ在同机不同进程之间的数据共享速度

C语言客户端代码如下

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <sys/time.h>

#define TEST_LEN 30 * 1024  * 1024


uint64_t GetTickCount() {

        struct timeval tv;
    gettimeofday(&amp;tv, NULL);
      return tv.tv_sec*1000000 + tv.tv_usec; // us 
     //return tv.tv_sec * 1000 + tv.tv_usec / 1000; // ms
}


int main (void)
{
    void *context = zmq_ctx_new();
    void *responder = zmq_socket (context, ZMQ_REQ);
    int rc = zmq_connect(responder, "ipc://test.ipc");
    printf("rc :%d\n", rc);
    char tmp[1024] = {0};
     char *buff = (char*)malloc(TEST_LEN);
     char *buff2 = (char*)malloc(TEST_LEN);
    memset(buff, 1, TEST_LEN);
         zmq_send(responder, buff, TEST_LEN, 0);
        uint64_t send_ts = 0;
        uint64_t send_over_ts = 0;                                                                                            
     int index = 1;
       while(1)
    {
        memset(buff, index, TEST_LEN);
         send_ts = GetTickCount();
        zmq_send(responder, buff, TEST_LEN, 0);
         send_over_ts = GetTickCount();
        printf("zmq  ts diff :%d \n", send_over_ts - send_ts);
        int len = zmq_recv(responder, tmp, 1024,0);
        printf("recv data :%s \n", tmp);
        usleep(1000);
        // sleep(1);
        index++;
    }
    free(buff);
    return 0;
}

C语言服务端代码如下

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>

#define MAX_LEN  100 * 1024 * 1024

char buffer[MAX_LEN];

int main (void)
{
    /*  Socket to talk to clients    */
    void *context = zmq_ctx_new ();
    void *responder = zmq_socket (context, ZMQ_REP);
    int rc = zmq_bind (responder, "ipc://test.ipc");
    assert (rc == 0);
    while(1)
    {
        int len = zmq_recv (responder, buffer, MAX_LEN, 0);
        printf ("Received data len:%d \n", len);
        //usleep (1);          /*Do some 'work'*/
        zmq_send (responder, "World", 5, 0);
    }
    return 0;
}

使用C语言时,需要通过yum 安装 zmq开发包;在编译的时候,要指定zmq动态库。 gcc -o test_client test_client.cpp -lzmq

从测试结果来看,使用C语言以后,共享10MB的数据,平均耗时1.38ms。

验证结论

  • 使用C/C++版本相比python而言,性能提升了10倍以上。这也间接说明,要想高性能还是需要使用C/C++ 这种编译型语言。
  • ZerMQ这种分布式用法,非常适合和生产者-消费者编程模型结合,间接帮助开发者实现了线程竞争、线程间负载平衡,是一个开发利器。

作为一名开发者,希望大家积极留言,交流开发经验心得,实现技术能力的彼此提升。

发表评论

邮箱地址不会被公开。 必填项已用*标注