zgossip(3)

zgossip(3)

CZMQ Manual - CZMQ/4.2.0

Name

zgossip - Class for decentralized configuration management

Synopsis

//  To work with zgossip, use the CZMQ zactor API:
//
//  Create new zgossip instance, passing logging prefix:
//
//      zactor_t *zgossip = zactor_new (zgossip, "myname");
//
//  Destroy zgossip instance
//
//      zactor_destroy (&zgossip);
//
//  Enable verbose logging of commands and activity:
//
//      zstr_send (zgossip, "VERBOSE");
//
//  Bind zgossip to specified endpoint. TCP endpoints may specify
//  the port number as "*" to acquire an ephemeral port:
//
//      zstr_sendx (zgossip, "BIND", endpoint, NULL);
//
//  Return assigned port number, specifically when BIND was done using an
//  an ephemeral port:
//
//      zstr_sendx (zgossip, "PORT", NULL);
//      char *command, *port_str;
//      zstr_recvx (zgossip, &command, &port_str, NULL);
//      assert (streq (command, "PORT"));
//
//  Specify configuration file to load, overwriting any previous loaded
//  configuration file or options:
//
//      zstr_sendx (zgossip, "LOAD", filename, NULL);
//
//  Set configuration path value:
//
//      zstr_sendx (zgossip, "SET", path, value, NULL);
//
//  Save configuration data to config file on disk:
//
//      zstr_sendx (zgossip, "SAVE", filename, NULL);
//
//  Send zmsg_t instance to zgossip:
//
//      zactor_send (zgossip, &msg);
//
//  Receive zmsg_t instance from zgossip:
//
//      zmsg_t *msg = zactor_recv (zgossip);
//
//  This is the zgossip constructor as a zactor_fn:
//
CZMQ_EXPORT void
    zgossip (zsock_t *pipe, void *args);

//  Self test of this class
CZMQ_EXPORT void
    zgossip_test (bool verbose);
Please add '@interface' section in './../src/zgossip.c'.

Description

Implements a gossip protocol for decentralized configuration management. Your applications nodes form a loosely connected network (which can have cycles), and publish name/value tuples. Each node re-distributes the new tuples it receives, so that the entire network eventually achieves a consistent state. The current design does not expire tuples.

Provides these commands (sent as multipart strings to the actor):

  • BIND endpoint — binds the gossip service to specified endpoint
  • PORT — returns the last TCP port, if any, used for binding
  • LOAD configfile — load configuration from specified file
  • SET configpath value — set configuration path = value
  • SAVE configfile — save configuration to specified file
  • CONNECT endpoint — connect the gossip service to the specified peer
  • PUBLISH key value — publish a key/value pair to the gossip cluster
  • STATUS — return number of key/value pairs held by gossip service
  • ZAP DOMAIN domain — set the ZAP DOMAIN domain = value

Returns these messages:

  • PORT number — reply to PORT command
  • STATUS number — reply to STATUS command
  • DELIVER key value — new tuple delivered from network

The gossip protocol distributes information around a loosely-connected network of gossip services. The information consists of name/value pairs published by applications at any point in the network. The goal of the gossip protocol is to create eventual consistency between all the using applications.

The name/value pairs (tuples) can be used for configuration data, for status updates, for presence, or for discovery. When used for discovery, the gossip protocol works as an alternative to e.g. UDP beaconing.

The gossip network consists of a set of loosely-coupled nodes that exchange tuples. Nodes can be connected across arbitrary transports, so the gossip network can have nodes that communicate over inproc, over IPC, and/or over TCP, at the same time.

Each node runs the same stack, which is a server-client hybrid using a modified Harmony pattern (from Chapter 8 of the Guide): http://zguide.zeromq.org/page:all#True-Peer-Connectivity-Harmony-Pattern

Each node provides a ROUTER socket that accepts client connections on an key defined by the application via a BIND command. The state machine for these connections is in zgossip.xml, and the generated code is in zgossip_engine.inc.

Each node additionally creates outbound connections via DEALER sockets to a set of servers ("remotes"), and under control of the calling app, which sends CONNECT commands for each configured remote.

The messages between client and server are defined in zgossip_msg.xml. We built this stack using the zeromq/zproto toolkit.

To join the gossip network, a node connects to one or more peers. Each peer acts as a forwarder. This loosely-coupled network can scale to thousands of nodes. However the gossip protocol is NOT designed to be efficient, and should not be used for application data, as the same tuples may be sent many times across the network.

The basic logic of the gossip service is to accept PUBLISH messages from its owning application, and to forward these to every remote, and every client it talks to. When a node gets a duplicate tuple, it throws it away. When a node gets a new tuple, it stores it, and forwards it as just described.

At present there is no way to expire tuples from the network.

The assumptions in this design are:

  • The data set is slow-changing. Thus, the cost of the gossip protocol is irrelevant with respect to other traffic.

Example

From zgossip_test method

// Test basic client-to-server operation of the protocol
zactor_t *server = zactor_new (zgossip, "server");
assert (server);
if (verbose)
 zstr_send (server, "VERBOSE");
zstr_sendx (server, "BIND", "inproc://zgossip", NULL);

zsock_t *client = zsock_new (ZMQ_DEALER);
assert (client);
zsock_set_rcvtimeo (client, 2000);
int rc = zsock_connect (client, "inproc://zgossip");
assert (rc == 0);

// Send HELLO, which gets no message
zgossip_msg_t *message = zgossip_msg_new ();
zgossip_msg_set_id (message, ZGOSSIP_MSG_HELLO);
zgossip_msg_send (message, client);

// Send PING, expect PONG back
zgossip_msg_set_id (message, ZGOSSIP_MSG_PING);
zgossip_msg_send (message, client);
zgossip_msg_recv (message, client);
assert (zgossip_msg_id (message) == ZGOSSIP_MSG_PONG);
zgossip_msg_destroy (&message);

zactor_destroy (&server);
zsock_destroy (&client);

// Test peer-to-peer operations
zactor_t *base = zactor_new (zgossip, "base");
assert (base);
if (verbose)
 zstr_send (base, "VERBOSE");
// Set a 100msec timeout on clients so we can test expiry
zstr_sendx (base, "SET", "server/timeout", "100", NULL);
zstr_sendx (base, "BIND", "inproc://base", NULL);

zactor_t *alpha = zactor_new (zgossip, "alpha");
assert (alpha);

if (verbose)
 zstr_send (alpha, "VERBOSE");

zstr_sendx (alpha, "CONNECT", "inproc://base", NULL);

zstr_sendx (alpha, "PUBLISH", "inproc://alpha-1", "service1", NULL);
zstr_sendx (alpha, "PUBLISH", "inproc://alpha-2", "service2", NULL);

zactor_t *beta = zactor_new (zgossip, "beta");
assert (beta);

if (verbose)
 zstr_send (beta, "VERBOSE");

zstr_sendx (beta, "CONNECT", "inproc://base", NULL);

zstr_sendx (beta, "PUBLISH", "inproc://beta-1", "service1", NULL);
zstr_sendx (beta, "PUBLISH", "inproc://beta-2", "service2", NULL);

// got nothing
zclock_sleep (200);

zstr_send (alpha, "STATUS");
char *command, *status, *key, *value;

zstr_recvx (alpha, &command, &key, &value, NULL);

assert (streq (command, "DELIVER"));
assert (streq (key, "inproc://alpha-1"));
assert (streq (value, "service1"));

zstr_free (&command);
zstr_free (&key);
zstr_free (&value);

zstr_recvx (alpha, &command, &key, &value, NULL);
assert (streq (command, "DELIVER"));
assert (streq (key, "inproc://alpha-2"));
assert (streq (value, "service2"));
zstr_free (&command);
zstr_free (&key);
zstr_free (&value);

zstr_recvx (alpha, &command, &key, &value, NULL);
assert (streq (command, "DELIVER"));
assert (streq (key, "inproc://beta-1"));
assert (streq (value, "service1"));
zstr_free (&command);
zstr_free (&key);
zstr_free (&value);

zstr_recvx (alpha, &command, &key, &value, NULL);
assert (streq (command, "DELIVER"));
assert (streq (key, "inproc://beta-2"));
assert (streq (value, "service2"));
zstr_free (&command);
zstr_free (&key);
zstr_free (&value);

zstr_recvx (alpha, &command, &status, NULL);
assert (streq (command, "STATUS"));
assert (atoi (status) == 4);
zstr_free (&command);
zstr_free (&status);

zactor_destroy (&base);
zactor_destroy (&alpha);
zactor_destroy (&beta);

#ifdef CZMQ_BUILD_DRAFT_API
// DRAFT-API: Security
// curve
if (zsys_has_curve()) {
 if (verbose)
 printf("testing CURVE support");
 zclock_sleep (2000);
 zactor_t *auth = zactor_new(zauth, NULL);
 assert (auth);
 if (verbose) {
 zstr_sendx (auth, "VERBOSE", NULL);
 zsock_wait (auth);
 }
 zstr_sendx(auth,"ALLOW","127.0.0.1",NULL);
 zsock_wait(auth);
 zstr_sendx (auth, "CURVE", CURVE_ALLOW_ANY, NULL);
 zsock_wait (auth);

 server = zactor_new (zgossip, "server");
 if (verbose)
 zstr_send (server, "VERBOSE");
 assert (server);

 zcert_t *client1_cert = zcert_new ();
 zcert_t *server_cert = zcert_new ();

 zstr_sendx (server, "SET PUBLICKEY", zcert_public_txt (server_cert), NULL);
 zstr_sendx (server, "SET SECRETKEY", zcert_secret_txt (server_cert), NULL);
 zstr_sendx (server, "ZAP DOMAIN", "TEST", NULL);

 zstr_sendx (server, "BIND", "tcp://127.0.0.1:*", NULL);
 zstr_sendx (server, "PORT", NULL);
 zstr_recvx (server, &command, &value, NULL);
 assert (streq (command, "PORT"));
 int port = atoi (value);
 zstr_free (&command);
 zstr_free (&value);
 char endpoint [32];
 sprintf (endpoint, "tcp://127.0.0.1:%d", port);

 zactor_t *client1 = zactor_new (zgossip, "client");
 if (verbose)
 zstr_send (client1, "VERBOSE");
 assert (client1);

 zstr_sendx (client1, "SET PUBLICKEY", zcert_public_txt (client1_cert), NULL);
 zstr_sendx (client1, "SET SECRETKEY", zcert_secret_txt (client1_cert), NULL);
 zstr_sendx (client1, "ZAP DOMAIN", "TEST", NULL);

 const char *public_txt = zcert_public_txt (server_cert);
 zstr_sendx (client1, "CONNECT", endpoint, public_txt, NULL);
 zstr_sendx (client1, "PUBLISH", "tcp://127.0.0.1:9001", "service1", NULL);

 zclock_sleep (500);

 zstr_send (server, "STATUS");
 zclock_sleep (500);

 zstr_recvx (server, &command, &key, &value, NULL);
 assert (streq (command, "DELIVER"));
 assert (streq (value, "service1"));

 zstr_free (&command);
 zstr_free (&key);
 zstr_free (&value);

 zstr_sendx (client1, "$TERM", NULL);
 zstr_sendx (server, "$TERM", NULL);

 zclock_sleep(500);

 zcert_destroy (&client1_cert);
 zcert_destroy (&server_cert);

 zactor_destroy (&client1);
 zactor_destroy (&server);
 zactor_destroy (&auth);
}
#endif

#if defined (__WINDOWS__)
zsys_shutdown(); #endif

Authors

The czmq manual was written by the authors in the AUTHORS file.

Resources

Main web site:

Report bugs to the email <gro.qmorez.stsil|ved-qmorez#gro.qmorez.stsil|ved-qmorez>

Copyright

Copyright (c) the Contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for ØMQ: http://czmq.zeromq.org. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.