当前位置:编程学习 > C/C++ >>

ZeroMQ指南-第1章-基础-放出消息

放出消息
第二个经典模式是单向数据分发,服务器推送更新到一组客户端。让我们看一个推送天气情况变化的例子,包含地区编码、温度、和相对湿度。我们会生成随机值来模拟真实气象站。
这是服务器代码,这个程序我们使用5556端口。
wuserver: Weather update server in C
[cpp]  
//  
// Weather update server  
// Binds PUB socket to tcp://*:5556  
// Publishes random weather updates  
//  
#include "zhelpers.h"  
  
int main (void)  
{  
    // Prepare our context and publisher  
    void *context = zmq_ctx_new ();  
    void *publisher = zmq_socket (context, ZMQ_PUB);  
    int rc = zmq_bind (publisher, "tcp://*:5556");  
    assert (rc == 0);  
    rc = zmq_bind (publisher, "ipc://weather.ipc");  
    assert (rc == 0);  
  
    // Initialize random number generator  
    srandom ((unsigned) time (NULL));  
    while (1) {  
        // Get values that will fool the boss  
        int zipcode, temperature, relhumidity;  
        zipcode = randof (100000);  
        temperature = randof (215) - 80;  
        relhumidity = randof (50) + 10;  
  
        // Send message to all subscribers  
        char update [20];  
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);  
        s_send (publisher, update);  
    }  
    zmq_close (publisher);  
    zmq_ctx_destroy (context);  
    return 0;  
}  
更新流既无开始也无结束,像一个永不结束的天气预报。
图 4 – 发布-订阅
这是客户端程序,监听更新流并捕获符合特定地区编码的所有消息,默认为纽约市因为那是个冒险的好地方:
wuclient: Weather update client in C
[cpp]  
//  
// Weather update client  
// Connects SUB socket to tcp://localhost:5556  
// Collects weather updates and finds avg temp in zipcode  
//  
#include "zhelpers.h"  
  
int main (int argc, char *argv [])  
{  
    void *context = zmq_ctx_new ();  
  
    // Socket to talk to server  
    printf ("Collecting updates from weather server…\n");  
    void *subscriber = zmq_socket (context, ZMQ_SUB);  
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");  
    assert (rc == 0);  
  
    // Subscribe to zipcode, default is NYC, 10001  
    char *filter = (argc > 1) ? argv [1] : "10001 ";  
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));  
    assert (rc == 0);  
  
    // Process 100 updates  
    int update_nbr;  
    long total_temp = 0;  
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {  
        char *string = s_recv (subscriber);  
  
        int zipcode, temperature, relhumidity;  
        sscanf (string, "%d %d %d",  
                &zipcode, &temperature, &relhumidity);  
        total_temp += temperature;  
        free (string);  
    }  
    printf ("Average temperature for zipcode '%s' was %dF\n",  
            filter, (int) (total_temp / update_nbr));  
  
    zmq_close (subscriber);  
    zmq_ctx_destroy (context);  
    return 0;  
}  
注意当你使用一个订阅套接字时你必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅,就像这段代码中那样。如果你不设置任何订阅,就得不到任何消息。这是初学者的常见错误。订阅者可以设置很多订阅,会合并到一起。就是说,如果一个更新匹配任意一个订阅,订阅者都会接收。订阅者也可以取消特定的订阅。一个订阅通常是一个可打印字符串,但也不是必须的。参考zmq_setsockopt()看这是怎么工作的。
发布订阅套接字对是异步的。客户端在循环中(或单次如果有必要)做zmq_msg_recv()。尝试发送消息到订阅套接字将导致错误。同样的,服务按所需频率做zmq_msg_send(),但绝不能对发布套接字做zmq_msg_recv()。
理论上,ØMQ套接字不在乎哪一端来连接哪一端来绑定。但是在实践中会有未公开的差异,待会我会提及。现在,绑定发布并连接订阅,除非你的网络设计导致这无法实现。
还有一个关于发布订阅套接字的重要事项:你无法精确知道订阅者什么时候开始获取消息。即使你先启动一个订阅者,过一会再启动发布者,订阅者总是会错过发布者发送的第一条消息。这是因为订阅者连接到发布者时(占用了短暂但非零的时间),发布者可能已经将消息发送出去了。
这种“迟钝加入者”症状击中了很多人、很多次,我们需要详细解释一下。记住ØMQ是异步I/O的,也就是在后台。比如说你有两个节点按这个顺序这么做:
订阅者连接到一个端点并接收和计数消息
发布者绑定到一个端点并立刻发送1000条消息
那么订阅者很可能不会接收到任何东西。你可以眨眨眼,检查一下是否设置了正确的过滤器,再试试,然而订阅者还是不会接收任何东西。
建立TCP连接牵涉到大概几毫秒的来回握手,这取决于网络状况和节点之间跳跃的次数。这个时间里,ØMQ已经可以发送好多消息了。为了证明,假设花费5毫秒来建立连接,而相同的链路可以搞定1M每秒的消息。在订阅者连接到发布者的5毫秒里,发布者仅耗费1毫秒就将这1K的消息发送出去了。
在第2章 - 套接字与模式中我们会解释如何让发布者和订阅者同步,以便无需等待订阅者(们)都已连接并就绪时才开始发布数据。有一个简单的笨办法来延迟发布者,通过睡眠sleep。但是在真实程序中可不要这么做,因为这极为脆弱、不雅而缓慢。先用sleep向自己证明到底发生了什么,然后等到第2章再看正确做法。
同步的另一种替代方案是简单的假设发布的数据流是无限的,没有开始也没有结束。还假设订阅者不在乎它启动之前发生过什么。这就是我们建造的例子天气客户端的方式。
客户端订阅到选定的地区编码并收集1000个更新。也就是大概1000万服务器更新,如果地区编码是随机分发的。你可以先启动客户端,再启动服务器,而客户端会持续工作。你可以随时停止并重启服务器,而客户端会持续工作。当客户端收集到了1000个更新,它计算出平均值,打印输出,然后退出。
关于发布订阅模式的一些要点:
订阅者可以每次调用“connect”来连接到一个以上的发布者。数据会交错到达(“公平队列”)以避免发布者相互淹没。
如果一个发布者没有已连接的订阅者,那么它直接丢弃所有消息。
如果你使用TCP时一个订阅者很慢,消息会在发布者那里排上队。我们待会看看这种情况下如何用“高水位线”来保护发布者。
从ØMQ 3.x开始,使用已连接协议(tcp: 或 ipc:)将在发布者方面进行过滤,使用e
补充:软件开发 , C++ ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,