zproc(3)

zproc(3)

CZMQ Manual - CZMQ/4.2.0

Name

zproc - Class for process configuration and status

Synopsis

//  This is a draft class, and may change without notice. It is disabled in
//  stable builds by default. If you use this in applications, please ask
//  for it to be pushed to stable state. Use --enable-drafts to enable.
#ifdef CZMQ_BUILD_DRAFT_API
//  *** Draft method, for development use, may change without warning ***
//  Create a new zproc.
//  NOTE: On Windows and with libzmq3 and libzmq2 this function
//  returns NULL. Code needs to be ported there.
CZMQ_EXPORT zproc_t *
    zproc_new (void);

//  *** Draft method, for development use, may change without warning ***
//  Destroy zproc, wait until process ends.
CZMQ_EXPORT void
    zproc_destroy (zproc_t **self_p);

//  *** Draft method, for development use, may change without warning ***
//  Return command line arguments (the first item is the executable) or
//  NULL if not set.
//  Caller owns return value and must destroy it when done.
CZMQ_EXPORT zlist_t *
    zproc_args (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  Setup the command line arguments, the first item must be an (absolute) filename
//  to run.
CZMQ_EXPORT void
    zproc_set_args (zproc_t *self, zlist_t **arguments);

//  *** Draft method, for development use, may change without warning ***
//  Setup the command line arguments, the first item must be an (absolute) filename
//  to run. Variadic function, must be NULL terminated.
CZMQ_EXPORT void
    zproc_set_argsx (zproc_t *self, const char *arguments, ...);

//  *** Draft method, for development use, may change without warning ***
//  Setup the environment variables for the process.
CZMQ_EXPORT void
    zproc_set_env (zproc_t *self, zhash_t **arguments);

//  *** Draft method, for development use, may change without warning ***
//  Connects process stdin with a readable ('>', connect) zeromq socket. If
//  socket argument is NULL, zproc creates own managed pair of inproc
//  sockets.  The writable one is then accessbile via zproc_stdin method.
CZMQ_EXPORT void
    zproc_set_stdin (zproc_t *self, void *socket);

//  *** Draft method, for development use, may change without warning ***
//  Connects process stdout with a writable ('@', bind) zeromq socket. If
//  socket argument is NULL, zproc creates own managed pair of inproc
//  sockets.  The readable one is then accessbile via zproc_stdout method.
CZMQ_EXPORT void
    zproc_set_stdout (zproc_t *self, void *socket);

//  *** Draft method, for development use, may change without warning ***
//  Connects process stderr with a writable ('@', bind) zeromq socket. If
//  socket argument is NULL, zproc creates own managed pair of inproc
//  sockets.  The readable one is then accessbile via zproc_stderr method.
CZMQ_EXPORT void
    zproc_set_stderr (zproc_t *self, void *socket);

//  *** Draft method, for development use, may change without warning ***
//  Return subprocess stdin writable socket. NULL for
//  not initialized or external sockets.
CZMQ_EXPORT void *
    zproc_stdin (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  Return subprocess stdout readable socket. NULL for
//  not initialized or external sockets.
CZMQ_EXPORT void *
    zproc_stdout (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  Return subprocess stderr readable socket. NULL for
//  not initialized or external sockets.
CZMQ_EXPORT void *
    zproc_stderr (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  Starts the process, return just before execve/CreateProcess.
CZMQ_EXPORT int
    zproc_run (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  process exit code
CZMQ_EXPORT int
    zproc_returncode (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  PID of the process
CZMQ_EXPORT int
    zproc_pid (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  return true if process is running, false if not yet started or finished
CZMQ_EXPORT bool
    zproc_running (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  The timeout should be zero or greater, or -1 to wait indefinitely.
//  wait or poll process status, return return code
CZMQ_EXPORT int
    zproc_wait (zproc_t *self, int timeout);

//  *** Draft method, for development use, may change without warning ***
//  send SIGTERM signal to the subprocess, wait for grace period and
//  eventually send SIGKILL
CZMQ_EXPORT void
    zproc_shutdown (zproc_t *self, int timeout);

//  *** Draft method, for development use, may change without warning ***
//  return internal actor, useful for the polling if process died
CZMQ_EXPORT void *
    zproc_actor (zproc_t *self);

//  *** Draft method, for development use, may change without warning ***
//  send a signal to the subprocess
CZMQ_EXPORT void
    zproc_kill (zproc_t *self, int signal);

//  *** Draft method, for development use, may change without warning ***
//  set verbose mode
CZMQ_EXPORT void
    zproc_set_verbose (zproc_t *self, bool verbose);

//  *** Draft method, for development use, may change without warning ***
//  Self test of this class.
CZMQ_EXPORT void
    zproc_test (bool verbose);

#endif // CZMQ_BUILD_DRAFT_API
Please add '@interface' section in './../src/zproc.c'.

Description

zproc - process configuration and status, plus unix pipes on steroids

zproc class have several limitations atm * is tested on zmq4 on Linux and OSX. * does not work on Windows, where you get empty stubs for most of the methods * does not work on libzmq3 and libzmq2. We have experienced stalls and timeouts when running tests against such old version

Note: zproc is not yet stable, so there are no guarantees regarding API stability. Some methods can have weird semantics or strange API.

Class zproc run an external process and to use ZeroMQ sockets to communicate with it. In other words standard input and outputs MAY be connected with appropriate zeromq socket and data flow is managed by zproc itself. This makes zproc the best in class way how to run and manage sub processes.

Data are sent and received as zframes (zframe_t), so zproc does not try to interpret content of the messages in any way. See test example on how to use it.

+----------------------------------------+
|    /bin/cat cat /etc/passwd            |
|    stdin   | stdout      |    stderr   |
|------||--------||---------------||-----|
|      fd1       fd2              fd3    |
|       ^         v                v     |
|zmq:@@//@@stdin |zmq:@@//@@stdout |zmq:@@//@@stderr |
|         [zproc supervisor]          |
+----------------------------------------+
----------> zeromq magic here <-----------

Please add @discuss section in ./../src/zproc.c.

Example

From zproc_test method

// variable file contains path to zsp executable:
// char *file = "path/to/zsp";

#if defined (__WINDOWS__)
printf ("Very limited (on Windows) ");
{
 zsys_init ();
 zproc_t *self = zproc_new ();
 assert (self);

 zproc_set_verbose (self, verbose);
 zproc_set_argsx (self, file, "-v", NULL);
 zproc_run (self);
 zclock_sleep (100); // to let actor start the process
 assert (zproc_pid (self));

 zproc_kill (self, SIGTERM);
 assert (zproc_returncode (self) == 255);
 zproc_destroy (&self);
}
printf ("OK\n");
return;
#endif
{
// Test case #1: run command, wait until it ends and get the (stdandard) output
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);

// join stdout of the process to zeromq socket
// all data will be readable from zproc_stdout socket
assert (!zproc_stdout (self));
zproc_set_stdout (self, NULL);
assert (zproc_stdout (self));

zproc_set_argsx (self, file, "--help", NULL);

if (verbose)
 zsys_debug("zproc_test() : launching helper '%s' --help", file );

int r = zproc_run (self);
assert (r == 0);
zframe_t *frame;
zsock_brecv (zproc_stdout (self), "f", &frame);
assert (frame);
assert (zframe_data (frame));
// TODO: real test
if (verbose)
 zframe_print (frame, "1:");
zframe_destroy (&frame);
r = zproc_wait (self, -1);
assert (r == 0);
zproc_destroy (&self);
}

{
// Test case#2: run zsp helper with a content written on stdin, check if it was passed to stdout
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// forward input from stdin to stderr
zproc_set_argsx (self, file, "--stdin", "--stderr", NULL);
// FIXME: there is a BUG in zproc somewhere, you can't gen an output from both stdout/stderr
//zproc_set_argsx (self, file, "--stdin", "--stdout", "--stderr", NULL);
zproc_set_stdin (self, NULL);
// FIXME: the bug
//zproc_set_stdout (self, NULL);
zproc_set_stderr (self, NULL);

// send data to stdin
int r = zproc_run (self);
assert (r == 0);
zframe_t *frame = zframe_new ("Lorem ipsum\0\0", strlen ("Lorem ipsum")+2);
assert (frame);
zsock_bsend (zproc_stdin (self), "f", frame);
zframe_destroy (&frame);

// FIXME: the bug
//zproc_set_stdout (self, NULL);
// read data from stdout
/*
zsys_debug ("BAF1");
zsock_brecv (zproc_stdout (self), "f", &frame);
zsys_debug ("BAF2");
assert (frame);
assert (zframe_data (frame));
if (verbose)
 zframe_print (frame, "2.stdout:");
assert (!strncmp ((char*) zframe_data (frame), "Lorem ipsum", 11));
*/

// read data from stderr
zsock_brecv (zproc_stderr (self), "f", &frame);
assert (frame);
assert (zframe_data (frame));
if (verbose)
 zframe_print (frame, "2.stderr:");
assert (!strncmp ((char*) zframe_data (frame), "Lorem ipsum", 11));
zproc_kill (self, SIGTERM);
zproc_wait (self, -1);
zframe_destroy (&frame);
zproc_destroy (&self);
}

{
// Test case#3: run non existing binary
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// forward input from stdin to stderr
zproc_set_argsx (self, "/not/existing/file", NULL);

int r = zproc_run (self);
assert (r == -1);
zproc_destroy (&self);
}

{
// Test case #4: child abort itself
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
zproc_set_argsx (self, file, "--verbose", "--abrt", NULL);
zproc_set_stdout (self, NULL);
zproc_set_stderr (self, NULL);
zproc_set_stdin (self, NULL);

int r = zproc_run (self);
zclock_sleep (100); // to let actor start the process
assert (r != -1);
zclock_sleep (100);
zframe_t *frame;
zsock_brecv (zproc_stdout (self), "f", &frame);
assert (zframe_is (frame));
assert (zframe_size (frame) > 0);
zframe_destroy (&frame);
zproc_wait (self, -1);
assert (zproc_returncode (self) == -SIGABRT);
zproc_destroy (&self);
}

{
// Test case #5: use never ending subprocess and poller to read data from it
// Create new zproc instance
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// join stdout of the process to zeromq socket
// all data will be readable from zproc_stdout socket
zproc_set_stdout (self, NULL);

zlist_t *args = zlist_new ();
zlist_autofree (args);
zlist_append (args, file);
zlist_append (args, "--stdout");
zproc_set_args (self, &args);

zhash_t *env = zhash_new ();
zhash_autofree (env);
zhash_insert (env, "ZSP_MESSAGE", "czmq is great\n");
zproc_set_env (self, &env);

// execute the binary. It runs in own actor, which monitor the process and
// pass data accross pipes and zeromq sockets
if (verbose)
 zsys_debug("zproc_test() : launching helper '%s'", file );
zproc_run (self);
zpoller_t *poller = zpoller_new (zproc_stdout (self), NULL);

// kill the binary, it never ends, but the test must:
// termination also flushes the output streams so we can
// read them entirely; note that other process runs in
// parallel to this thread
if (verbose)
 zsys_debug("zproc_test() : sleeping 4000 msec to gather some output from helper");
zclock_sleep (4000);
zproc_kill (self, SIGTERM);
zproc_wait (self, -1);

// read the content from zproc_stdout - use zpoller and a loop
bool stdout_read = false;
int64_t zproc_timeout_msec = 10000;
int64_t zproc_test_start_msec = zclock_mono();
int64_t zproc_test_elapsed_msec = 0;

while (!zsys_interrupted) {
 void *which = zpoller_wait (poller, 800);
 zproc_test_elapsed_msec = zclock_mono() - zproc_test_start_msec;

 if (!which) {
 if (stdout_read) {
 if (verbose)
 zsys_debug("zproc_test() : did not get stdout from helper, but we already have some (%" PRIi64 " msec remaining to retry)", (zproc_timeout_msec - zproc_test_elapsed_msec) );
 break;
 }
 if (zproc_timeout_msec > zproc_test_elapsed_msec) {
 if (verbose)
 zsys_debug("zproc_test() : did not get stdout from helper, %" PRIi64 " msec remaining to retry", (zproc_timeout_msec - zproc_test_elapsed_msec) );
 continue;
 }
 // ...else : we've slept a lot and got no response; kill the helper
 if (verbose)
 zsys_debug("zproc_test() : did not get stdout from helper, patience expired (%" PRIi64 " msec remaining to retry)", (zproc_timeout_msec - zproc_test_elapsed_msec) );
 break;
 }

 if (which == zproc_stdout (self)) {
 // it suffices for us to have read something
 // we only check the first frame, others may start with the
 // expected key string broken mid-way due to alignment etc.,
 // but we drain the whole incoming queue of stdout frames.
 zframe_t *frame;
 zsock_brecv (zproc_stdout (self), "f", &frame);
 assert (frame);
 assert (zframe_data (frame));
 if (!stdout_read) {
 if (verbose)
 zsys_debug("zproc_test() : got stdout from helper, %" PRIi64 " msec was remaining to retry", (zproc_timeout_msec - zproc_test_elapsed_msec));
 assert (!strncmp(
 "czmq is great\n",
 (char*) zframe_data (frame),
 14));
 stdout_read = true;
 }

 if (verbose)
 zframe_print (frame, "zproc_test");

 zframe_destroy (&frame);
 continue;
 }

 // should not get there
 if (verbose)
 zsys_debug("zproc_test() : reached the unreachable point (unexpected zpoller result), %" PRIi64 " msec was remaining to retry", (zproc_timeout_msec - zproc_test_elapsed_msec) );
 assert (false);
}

assert (stdout_read);
zpoller_destroy (&poller);
zproc_destroy (&self);
}
{
// testcase #6 wait for process that hangs, kill it
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);

zproc_set_argsx (self, file, NULL);

if (verbose)
 zsys_debug("zproc_test() : launching helper '%s'", file);

int r = zproc_run (self);
assert (r == 0);
r = zproc_wait (self, 1000);
assert (r == ZPROC_RUNNING);
assert (zproc_running (self));
zproc_shutdown (self, 1000);
assert (!zproc_running (self));
zproc_destroy (&self);
}
{
// testcase #7 wait for process that exits earlier
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);

zproc_set_argsx (self, file, "--quit", "1", NULL);

if (verbose)
 zsys_debug("zproc_test() : launching helper '%s' --quit 1", file);

int r = zproc_run (self);
assert (r == 0);
int t = zclock_mono ();
r = zproc_wait (self, 8000);
assert (r == 0);
t = zclock_mono () - t;
assert (t < 2000);
zproc_destroy (&self); }

Authors

The czmq manual was written by the authors in the AUTHORS file.

Resources

Main web site:

Report bugs to the email <gro.qmorez.stsil|ved-qmorez#gro.qmorez.stsil|ved-qmorez>

Copyright

Copyright (c) the Contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for ØMQ: http://czmq.zeromq.org. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.