Beginning CMake configuration for ZT

Only tested on Windows so far
This commit is contained in:
Grant Limberg 2019-06-20 16:13:52 -07:00
commit 0b3b5f6174
111 changed files with 19586 additions and 36 deletions

View file

@ -0,0 +1,82 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${LIBRABBITMQ_INCLUDE_DIRS} ${POPT_INCLUDE_DIR})
if (WIN32)
set(PLATFORM_DIR win32)
set(PLATFORM_SRCS
win32/compat.c
)
else (WIN32)
set(PLATFORM_DIR unix)
endif (WIN32)
include_directories(${PLATFORM_DIR})
set(COMMON_SRCS
common.h
common.c
${PLATFORM_SRCS}
)
add_executable(amqp-publish publish.c ${COMMON_SRCS})
target_link_libraries(amqp-publish ${RMQ_LIBRARY_TARGET} ${POPT_LIBRARY})
add_executable(amqp-get get.c ${COMMON_SRCS})
target_link_libraries(amqp-get ${RMQ_LIBRARY_TARGET} ${POPT_LIBRARY})
add_executable(amqp-consume consume.c ${PLATFORM_DIR}/process.c ${COMMON_SRCS})
target_link_libraries(amqp-consume ${RMQ_LIBRARY_TARGET} ${POPT_LIBRARY})
add_executable(amqp-declare-queue declare_queue.c ${COMMON_SRCS})
target_link_libraries(amqp-declare-queue ${RMQ_LIBRARY_TARGET} ${POPT_LIBRARY})
add_executable(amqp-delete-queue delete_queue.c ${COMMON_SRCS})
target_link_libraries(amqp-delete-queue ${RMQ_LIBRARY_TARGET} ${POPT_LIBRARY})
if (BUILD_TOOLS_DOCS)
if (XMLTO_FOUND)
set(DOCS_SRCS
doc/amqp-consume.xml
doc/amqp-declare-queue.xml
doc/amqp-delete-queue.xml
doc/amqp-get.xml
doc/amqp-publish.xml
doc/librabbitmq-tools.xml
)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/doc)
set(XMLTO_DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/doc/man-date.ent)
add_custom_command(
OUTPUT ${XMLTO_DEPENDS}
COMMAND date +'%Y-%m-%d' > ${XMLTO_DEPENDS}
VERBATIM
)
set(XMLTO_COMMAND_ARGS --skip-validation --searchpath "${CMAKE_CURRENT_BINARY_DIR}/doc")
XMLTO(${DOCS_SRCS}
MODES man
ALL)
foreach(file ${XMLTO_FILES_man})
get_filename_component(fileExt ${file} EXT)
string( REGEX REPLACE "^[.]" "" fileExt ${fileExt} )
install(
FILES ${file}
DESTINATION ${CMAKE_INSTALL_PREFIX}/share/man/man${fileExt}
)
endforeach()
else(XMLTO_FOUND)
message(WARNING "xmlto not found, will not build tools documentation")
endif(XMLTO_FOUND)
endif()
if (ENABLE_SSL_SUPPORT)
add_definitions(-DWITH_SSL=1)
endif()
install(TARGETS amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include)

View file

@ -0,0 +1,444 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "common.h"
#ifdef WITH_SSL
#include <amqp_ssl_socket.h>
#endif
#include <amqp_tcp_socket.h>
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef WINDOWS
#include "compat.h"
#endif
void die(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
exit(1);
}
void die_errno(int err, const char *fmt, ...) {
va_list ap;
if (err == 0) {
return;
}
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, ": %s\n", strerror(err));
exit(1);
}
void die_amqp_error(int err, const char *fmt, ...) {
va_list ap;
if (err >= 0) {
return;
}
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, ": %s\n", amqp_error_string2(err));
exit(1);
}
const char *amqp_server_exception_string(amqp_rpc_reply_t r) {
int res;
static char s[512];
switch (r.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded;
res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s",
m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded;
res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s",
m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
default:
res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X",
r.reply.id);
break;
}
return res >= 0 ? s : NULL;
}
const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) {
switch (r.reply_type) {
case AMQP_RESPONSE_NORMAL:
return "normal response";
case AMQP_RESPONSE_NONE:
return "missing RPC reply type";
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
return amqp_error_string2(r.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
return amqp_server_exception_string(r);
default:
abort();
}
}
void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) {
va_list ap;
if (r.reply_type == AMQP_RESPONSE_NORMAL) {
return;
}
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
exit(1);
}
static char *amqp_url;
static char *amqp_server;
static int amqp_port = -1;
static char *amqp_vhost;
static char *amqp_username;
static char *amqp_password;
static int amqp_heartbeat = 0;
#ifdef WITH_SSL
static int amqp_ssl = 0;
static char *amqp_cacert = "/etc/ssl/certs/cacert.pem";
static char *amqp_key = NULL;
static char *amqp_cert = NULL;
#endif /* WITH_SSL */
const char *connect_options_title = "Connection options";
struct poptOption connect_options[] = {
{"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to",
"amqp://..."},
{"server", 's', POPT_ARG_STRING, &amqp_server, 0,
"the AMQP server to connect to", "hostname"},
{"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"},
{"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
"the vhost to use when connecting", "vhost"},
{"username", 0, POPT_ARG_STRING, &amqp_username, 0,
"the username to login with", "username"},
{"password", 0, POPT_ARG_STRING, &amqp_password, 0,
"the password to login with", "password"},
{"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0,
"heartbeat interval, set to 0 to disable", "heartbeat"},
#ifdef WITH_SSL
{"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL},
{"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
"path to the CA certificate file", "cacert.pem"},
{"key", 0, POPT_ARG_STRING, &amqp_key, 0,
"path to the client private key file", "key.pem"},
{"cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
"path to the client certificate file", "cert.pem"},
#endif /* WITH_SSL */
{NULL, '\0', 0, NULL, 0, NULL, NULL}};
static void init_connection_info(struct amqp_connection_info *ci) {
ci->user = NULL;
ci->password = NULL;
ci->host = NULL;
ci->port = -1;
ci->vhost = NULL;
ci->user = NULL;
amqp_default_connection_info(ci);
if (amqp_url)
die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'",
amqp_url);
if (amqp_server) {
char *colon;
if (amqp_url) {
die("--server and --url options cannot be used at the same time");
}
/* parse the server string into a hostname and a port */
colon = strchr(amqp_server, ':');
if (colon) {
char *port_end;
size_t host_len;
/* Deprecate specifying the port number with the
--server option, because it is not ipv6 friendly.
--url now allows connection options to be
specified concisely. */
fprintf(stderr,
"Specifying the port number with --server is deprecated\n");
host_len = colon - amqp_server;
ci->host = malloc(host_len + 1);
memcpy(ci->host, amqp_server, host_len);
ci->host[host_len] = 0;
if (amqp_port >= 0) {
die("both --server and --port options specify server port");
}
ci->port = strtol(colon + 1, &port_end, 10);
if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 ||
*port_end != 0)
die("bad server port number in '%s'", amqp_server);
}
#if WITH_SSL
if (amqp_ssl && !ci->ssl) {
die("the --ssl option specifies an SSL connection"
" but the --url option does not");
}
#endif
}
if (amqp_port >= 0) {
if (amqp_url) {
die("--port and --url options cannot be used at the same time");
}
ci->port = amqp_port;
}
if (amqp_username) {
if (amqp_url) {
die("--username and --url options cannot be used at the same time");
}
ci->user = amqp_username;
}
if (amqp_password) {
if (amqp_url) {
die("--password and --url options cannot be used at the same time");
}
ci->password = amqp_password;
}
if (amqp_vhost) {
if (amqp_url) {
die("--vhost and --url options cannot be used at the same time");
}
ci->vhost = amqp_vhost;
}
if (amqp_heartbeat < 0) {
die("--heartbeat must be a positive value");
}
}
amqp_connection_state_t make_connection(void) {
int status;
amqp_socket_t *socket = NULL;
struct amqp_connection_info ci;
amqp_connection_state_t conn;
init_connection_info(&ci);
conn = amqp_new_connection();
if (ci.ssl) {
#ifdef WITH_SSL
socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
if (amqp_cacert) {
amqp_ssl_socket_set_cacert(socket, amqp_cacert);
}
if (amqp_key) {
amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key);
}
#else
die("librabbitmq was not built with SSL/TLS support");
#endif
} else {
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket (out of memory)");
}
}
status = amqp_socket_open(socket, ci.host, ci.port);
if (status) {
die("opening socket to %s:%d", ci.host, ci.port);
}
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat,
AMQP_SASL_METHOD_PLAIN, ci.user, ci.password),
"logging in to AMQP server");
if (!amqp_channel_open(conn, 1)) {
die_rpc(amqp_get_rpc_reply(conn), "opening channel");
}
return conn;
}
void close_connection(amqp_connection_state_t conn) {
int res;
die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel");
die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"closing connection");
res = amqp_destroy_connection(conn);
die_amqp_error(res, "closing connection");
}
amqp_bytes_t read_all(int fd) {
size_t space = 4096;
amqp_bytes_t bytes;
bytes.bytes = malloc(space);
bytes.len = 0;
for (;;) {
ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len);
if (res == 0) {
break;
}
if (res < 0) {
if (errno == EINTR) {
continue;
}
die_errno(errno, "reading");
}
bytes.len += res;
if (bytes.len == space) {
space *= 2;
bytes.bytes = realloc(bytes.bytes, space);
}
}
return bytes;
}
void write_all(int fd, amqp_bytes_t data) {
while (data.len > 0) {
ssize_t res = write(fd, data.bytes, data.len);
if (res < 0) {
die_errno(errno, "write");
}
data.len -= res;
data.bytes = (char *)data.bytes + res;
}
}
void copy_body(amqp_connection_state_t conn, int fd) {
size_t body_remaining;
amqp_frame_t frame;
int res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_HEADER) {
die("expected header, got frame type 0x%X", frame.frame_type);
}
body_remaining = frame.payload.properties.body_size;
while (body_remaining) {
res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for body frame");
if (frame.frame_type != AMQP_FRAME_BODY) {
die("expected body, got frame type 0x%X", frame.frame_type);
}
write_all(fd, frame.payload.body_fragment);
body_remaining -= frame.payload.body_fragment.len;
}
}
poptContext process_options(int argc, const char **argv,
struct poptOption *options, const char *help) {
int c;
poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
poptSetOtherOptionHelp(opts, help);
while ((c = poptGetNextOpt(opts)) >= 0) {
/* no options require explicit handling */
}
if (c < -1) {
fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS),
poptStrerror(c));
poptPrintUsage(opts, stderr, 0);
exit(1);
}
return opts;
}
void process_all_options(int argc, const char **argv,
struct poptOption *options) {
poptContext opts = process_options(argc, argv, options, "[OPTIONS]...");
const char *opt = poptPeekArg(opts);
if (opt) {
fprintf(stderr, "unexpected operand: %s\n", opt);
poptPrintUsage(opts, stderr, 0);
exit(1);
}
poptFreeContext(opts);
}
amqp_bytes_t cstring_bytes(const char *str) {
return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}

View file

@ -0,0 +1,73 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#include <stdint.h>
#include <popt.h>
#include <amqp.h>
#include <amqp_framing.h>
extern const char *amqp_server_exception_string(amqp_rpc_reply_t r);
extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
extern void die(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
extern void die_amqp_error(int err, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
extern const char *connect_options_title;
extern struct poptOption connect_options[];
extern amqp_connection_state_t make_connection(void);
extern void close_connection(amqp_connection_state_t conn);
extern amqp_bytes_t read_all(int fd);
extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
#define INCLUDE_OPTIONS(options) \
{ NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options##_title, NULL }
extern poptContext process_options(int argc, const char **argv,
struct poptOption *options,
const char *help);
extern void process_all_options(int argc, const char **argv,
struct poptOption *options);
extern amqp_bytes_t cstring_bytes(const char *str);

View file

@ -0,0 +1,250 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"
#include "process.h"
#define MAX_LISTEN_KEYS 1024
#define LISTEN_KEYS_DELIMITER ","
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes) {
/* We will need up to 4 chars per byte, plus the terminating 0 */
char *res = malloc(bytes.len * 4 + 1);
uint8_t *data = bytes.bytes;
char *p = res;
size_t i;
for (i = 0; i < bytes.len; i++) {
if (data[i] >= 32 && data[i] != 127) {
*p++ = data[i];
} else {
*p++ = '\\';
*p++ = '0' + (data[i] >> 6);
*p++ = '0' + (data[i] >> 3 & 0x7);
*p++ = '0' + (data[i] & 0x7);
}
}
*p = 0;
return res;
}
static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue,
char *exchange, char *routing_key, int declare,
int exclusive) {
amqp_bytes_t queue_bytes = cstring_bytes(queue);
char *routing_key_rest;
char *routing_key_token;
char *routing_tmp;
int routing_key_count = 0;
/* if an exchange name wasn't provided, check that we don't have options that
* require it. */
if (!exchange && routing_key) {
fprintf(stderr,
"--routing-key option requires an exchange name to be provided "
"with --exchange\n");
exit(1);
}
if (!queue || exchange || declare || exclusive) {
/* Declare the queue as auto-delete. */
amqp_queue_declare_ok_t *res = amqp_queue_declare(
conn, 1, queue_bytes, 0, 0, exclusive, 1, amqp_empty_table);
if (!res) {
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
}
if (!queue) {
/* the server should have provided a queue name */
char *sq;
queue_bytes = amqp_bytes_malloc_dup(res->queue);
sq = stringify_bytes(queue_bytes);
fprintf(stderr, "Server provided queue name: %s\n", sq);
free(sq);
}
/* Bind to an exchange if requested */
if (exchange) {
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
routing_tmp = strdup(routing_key);
if (NULL == routing_tmp) {
fprintf(stderr, "could not allocate memory to parse routing key\n");
exit(1);
}
for (routing_key_token =
strtok_r(routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest);
NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1;
routing_key_token =
strtok_r(NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest)) {
if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
cstring_bytes(routing_key_token),
amqp_empty_table)) {
die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
}
}
free(routing_tmp);
}
}
return queue_bytes;
}
#define AMQP_CONSUME_MAX_PREFETCH_COUNT 65535
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int no_ack, int count, int prefetch_count,
const char *const *argv) {
int i;
/* If there is a limit, set the qos to match */
if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
!amqp_basic_qos(conn, 1, 0, count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
/* if there is a maximum number of messages to be received at a time, set the
* qos to match */
if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) {
/* the maximum number of messages to be received at a time must be less
* than the global maximum number of messages. */
if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
prefetch_count >= count)) {
if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
}
}
if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0,
amqp_empty_table)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
}
for (i = 0; count < 0 || i < count; i++) {
amqp_frame_t frame;
struct pipeline pl;
uint64_t delivery_tag;
amqp_basic_deliver_t *deliver;
int res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_METHOD ||
frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
}
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
delivery_tag = deliver->delivery_tag;
pipeline(argv, &pl);
copy_body(conn, pl.infd);
if (finish_pipeline(&pl) && !no_ack)
die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, 0), "basic.ack");
amqp_maybe_release_buffers(conn);
}
}
int main(int argc, const char **argv) {
poptContext opts;
amqp_connection_state_t conn;
const char *const *cmd_argv;
static char *queue = NULL;
static char *exchange = NULL;
static char *routing_key = NULL;
static int declare = 0;
static int exclusive = 0;
static int no_ack = 0;
static int count = -1;
static int prefetch_count = -1;
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
{"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from",
"queue"},
{"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
"bind the queue to this exchange", "exchange"},
{"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
"the routing key to bind with", "routing key"},
{"declare", 'd', POPT_ARG_NONE, &declare, 0,
"declare an exclusive queue (deprecated, use --exclusive instead)",
NULL},
{"exclusive", 'x', POPT_ARG_NONE, &exclusive, 0,
"declare the queue as exclusive", NULL},
{"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode",
NULL},
{"count", 'c', POPT_ARG_INT, &count, 0,
"stop consuming after this many messages are consumed", "limit"},
{"prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
"receive only this many message at a time from the server", "limit"},
POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
opts = process_options(argc, argv, options, "[OPTIONS]... <command> <args>");
cmd_argv = poptGetArgs(opts);
if (!cmd_argv || !cmd_argv[0]) {
fprintf(stderr, "consuming command not specified\n");
poptPrintUsage(opts, stderr, 0);
goto error;
}
conn = make_connection();
queue_bytes =
setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv);
close_connection(conn);
return 0;
error:
poptFreeContext(opts);
return 1;
}

View file

@ -0,0 +1,79 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
static int durable = 0;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
{"queue", 'q', POPT_ARG_STRING, &queue, 0,
"the queue name to declare, or the empty string", "queue"},
{"durable", 'd', POPT_ARG_VAL, &durable, 1, "declare a durable queue",
NULL},
POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
if (queue == NULL) {
fprintf(stderr, "queue name not specified\n");
return 1;
}
conn = make_connection();
{
amqp_queue_declare_ok_t *reply = amqp_queue_declare(
conn, 1, cstring_bytes(queue), 0, durable, 0, 0, amqp_empty_table);
if (reply == NULL) {
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
}
printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
}
close_connection(conn);
return 0;
}

View file

@ -0,0 +1,81 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
static int if_unused = 0;
static int if_empty = 0;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
{"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue name to delete",
"queue"},
{"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
"do not delete unless queue is unused", NULL},
{"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
"do not delete unless queue is empty", NULL},
POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
if (queue == NULL || *queue == '\0') {
fprintf(stderr, "queue name not specified\n");
return 1;
}
conn = make_connection();
{
amqp_queue_delete_ok_t *reply =
amqp_queue_delete(conn, 1, cstring_bytes(queue), if_unused, if_empty);
if (reply == NULL) {
die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
}
printf("%u\n", reply->message_count);
}
close_connection(conn);
return 0;
}

View file

@ -0,0 +1,223 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>amqp-consume</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>amqp-consume</refname>
<refpurpose>Consume messages from a queue on an AMQP server</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>amqp-consume</command>
<arg choice="opt" rep="repeat">
<replaceable>OPTION</replaceable>
</arg>
<arg choice="req">
<replaceable>command</replaceable>
</arg>
<arg choice="opt" rep="repeat">
<replaceable>args</replaceable>
</arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<command>amqp-consume</command> consumes messages from a
queue on an AMQP server. For each message that arrives, a
receiving command is run, with the message body supplied
to it on standard input.
</para>
<para>
<command>amqp-consume</command> can consume from an
existing queue, or it can create a new queue. It can
optionally bind the queue to an existing exchange.
</para>
<para>
By default, messages will be consumed with explicit
acknowledgements. A message will only be acknowledged if
the receiving command exits successfully (i.e. with an
exit code of zero). The AMQP <quote>no ack</quote> mode
(a.k.a. auto-ack mode) can be enable with the
<option>-A</option> option.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<variablelist>
<varlistentry>
<term><option>-q</option></term>
<term><option>--queue</option>=<replaceable class="parameter">queue name</replaceable></term>
<listitem>
<para>
The name of the queue to consume messages
from.
</para>
<para>
If the <option>--queue</option> option is
omitted, the AMQP server will assign a unique
name to the queue, and that server-assigned
name will be dixsplayed on stderr; this case
implies that an exclusive queue should be
declared.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-e</option></term>
<term><option>--exchange</option>=<replaceable class="parameter">exchange name</replaceable></term>
<listitem>
<para>
Specifies that an exclusive queue should
be declared, and bound to the given exchange.
The specified exchange should already exist
unless the <option>--exchange-type</option>
option is used to request the creation of an
exchange.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-r</option></term>
<term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term>
<listitem>
<para>
The routing key for binding. If omitted, an
empty routing key is assumed.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-d</option></term>
<term><option>--declare</option></term>
<listitem>
<para>
Forces an exclusive queue to be declared,
even when it otherwise would not be. That is,
when a queue name is specified with the
<option>--queue</option> option, but no
binding to an exchange is requested with the
<option>--exchange</option> option.
Note: this option is deprecated and may be
removed in a future version, use the
<option>--exclusive</option> option to
explicitly declare an exclusive queue.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-x</option></term>
<term><option>--exclusive</option></term>
<listitem>
<para>
Declared queues are non-exclusive by default,
this option forces declaration of exclusive
queues.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-A</option></term>
<term><option>--no-ack</option>=<replaceable class="parameter">routing key</replaceable></term>
<listitem>
<para>
Enable <quote>no ack</quote> mode: The AMQP
server will unconditionally acknowledge each
message that is delivered, regardless of
whether the target command exits successfully
or not.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-c</option></term>
<term><option>--count</option>=<replaceable class="parameter">limit</replaceable></term>
<listitem>
<para>
Stop consuming after the given number of
messages have been received.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-p</option></term>
<term><option>--prefetch-count</option>=<replaceable class="parameter">limit</replaceable></term>
<listitem>
<para>
Request the server to only send
<replaceable class="parameter">limit</replaceable>
messages at a time.
</para>
<para>
If any value was passed to <option>--count</option>,
the value passed to <option>--prefetch-count</option>
should be smaller than that, or otherwise it will be
ignored.
</para>
<para>
If <option>-A</option>/<option>--no-ack</option> is
passed, this option has no effect.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Examples</title>
<variablelist>
<varlistentry>
<term>Consume messages from an existing queue
<quote><systemitem
class="resource">myqueue</systemitem></quote>, and
output the message bodies on standard output via
<command>cat</command>:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-publish -q myqueue cat</userinput></screen>
</listitem>
</varlistentry>
<varlistentry>
<term>Bind a new exclusive queue to an
exchange <quote><systemitem
class="resource">myexch</systemitem></quote>, and send
each message body to the script
<filename>myscript</filename>, automatically
acknowledging them on the server:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-consume -A -e myexch ./myscript</userinput></screen>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<citerefentry><refentrytitle>librabbitmq-tools</refentrytitle><manvolnum>7</manvolnum></citerefentry>
describes connection-related options common to all the
RabbitMQ C Client tools.
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,122 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>amqp-declare-queue</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>amqp-declare-queue</refname>
<refpurpose>Declare (create or assert the existence of) a queue on an AMQP server</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>amqp-declare-queue</command>
<arg choice="opt" rep="repeat">
<replaceable>OPTION</replaceable>
</arg>
<arg choice="opt">-d</arg>
<arg choice="req">-q <replaceable>queue name</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<command>amqp-declare-queue</command> attempts to create a
queue on an AMQP server, and exits. If the empty-string is
supplied as the queue name, a fresh queue name is
generated by the server and returned. In all cases, if a
queue was successfully declared, the (raw binary) name of
the queue is printed to standard output, followed by a
newline.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<variablelist>
<varlistentry>
<term><option>-q</option></term>
<term><option>--queue</option>=<replaceable class="parameter">queue name</replaceable></term>
<listitem>
<para>
The name of the queue to declare. If the
empty string is supplied, a fresh queue name
is generated by the server.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-d</option></term>
<term><option>--durable</option></term>
<listitem>
<para>
Causes the queue to be declared with the
"durable" flag set. Durable queues survive
server restarts. By default, queues are declared
in "transient" mode.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Exit Status</title>
<para>
If the queue was successfully declared, the exit status is
0. If an error occurs, the exit status is 1.
</para>
</refsect1>
<refsect1>
<title>Examples</title>
<variablelist>
<varlistentry>
<term>Declare the durable queue <quote><systemitem
class="resource">myqueue</systemitem></quote>, and
display the name of the queue on standard output:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-declare-queue -d -q myqueue</userinput>
myqueue</screen>
</listitem>
</varlistentry>
<varlistentry>
<term>Declare a fresh, server-named transient queue,
and display the name of the queue on standard output
(use <citerefentry><refentrytitle>amqp-delete-queue</refentrytitle>
<manvolnum>1</manvolnum></citerefentry> to delete
it from the server once you're done):</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-declare-queue -q ""</userinput>
amq.gen-BW/wvociA8g6LFpb1PlqOA==</screen>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<citerefentry><refentrytitle>librabbitmq-tools</refentrytitle><manvolnum>7</manvolnum></citerefentry>
describes connection-related options common to all the
RabbitMQ C Client tools.
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>amqp-delete-queue</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>amqp-delete-queue</refname>
<refpurpose>Delete a queue from an AMQP server</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>amqp-delete-queue</command>
<arg choice="opt" rep="repeat">
<replaceable>OPTION</replaceable>
</arg>
<arg choice="req">-q <replaceable>queue name</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<command>amqp-delete-queue</command> deletes a queue from
an AMQP server, and exits after printing to standard
output the number of messages that were in the queue at
the time of its deletion.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<variablelist>
<varlistentry>
<term><option>-q</option></term>
<term><option>--queue</option>=<replaceable class="parameter">queue name</replaceable></term>
<listitem>
<para>
The name of the queue to delete.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Exit Status</title>
<para>
If the queue was successfully deleted, the exit status is
0. If an error occurs, the exit status is 1.
</para>
</refsect1>
<refsect1>
<title>Examples</title>
<variablelist>
<varlistentry>
<term>Delete the
queue <quote><systemitem class="resource">myqueue</systemitem></quote>
at a moment when it has 123 messages waiting on
it:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-delete-queue -q myqueue</userinput>
123</screen>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<citerefentry><refentrytitle>librabbitmq-tools</refentrytitle><manvolnum>7</manvolnum></citerefentry>
describes connection-related options common to all the
RabbitMQ C Client tools.
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>amqp-get</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>amqp-get</refname>
<refpurpose>Get a message from a queue on an AMQP server</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>amqp-get</command>
<arg choice="opt" rep="repeat">
<replaceable>OPTION</replaceable>
</arg>
<arg choice="req">-q <replaceable>queue name</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<command>amqp-get</command> attempts to consume a single
message from a queue on an AMQP server, and exits. Unless
the queue was empty, the body of the resulting message is
sent to standard output.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<variablelist>
<varlistentry>
<term><option>-q</option></term>
<term><option>--queue</option>=<replaceable class="parameter">queue name</replaceable></term>
<listitem>
<para>
The name of the queue to consume messages
from.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Exit Status</title>
<para>
If the queue is not empty, and a message is successfully
retrieved, the exit status is 0. If an error occurs, the
exit status is 1. If the queue is found to be empty, the
exit status is 2.
</para>
</refsect1>
<refsect1>
<title>Examples</title>
<variablelist>
<varlistentry>
<term>Get a message from the queue <quote><systemitem
class="resource">myqueue</systemitem></quote>, and
display its body on standard output:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-get -q myqueue</userinput></screen>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<citerefentry><refentrytitle>librabbitmq-tools</refentrytitle><manvolnum>7</manvolnum></citerefentry>
describes connection-related options common to all the
RabbitMQ C Client tools.
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,169 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>amqp-publish</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>amqp-publish</refname>
<refpurpose>Publish a message on an AMQP server</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>amqp-publish</command>
<arg choice="opt" rep="repeat">
<replaceable>OPTION</replaceable>
</arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
Publishes a message to an exchange on an AMQP server.
Options allow the various properties of the message and
parameters of the AMQP <function>basic.publish</function>
method to be specified.
</para>
<para>
By default, the message body is read from standard input.
Alternatively, the <option>-b</option> option allows the message
body to be provided as part of the command.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<variablelist>
<varlistentry>
<term><option>-e</option></term>
<term><option>--exchange</option>=<replaceable class="parameter">exchange name</replaceable></term>
<listitem>
<para>
The name of the exchange to publish to. If
omitted, the default exchange (also known as
the nameless exchange) is used.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-r</option></term>
<term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term>
<listitem>
<para>
The routing key to publish with. If omitted,
an empty routing key is assumed. A routing
key must be specified when publishing to the
default exchange; in that case, accoding to
the AMQP specification, the routing key
corresponds to a queue name.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-p</option></term>
<term><option>--persistent</option></term>
<listitem>
<para>
Use the persistent delivery mode. Without
this option, non-persistent delivery is used.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-C</option></term>
<term><option>--content-type</option>=<replaceable class="parameter">MIME type</replaceable></term>
<listitem>
<para>
Specifies the content-type property for the
message. If omitted, the content-type
property is not set on the message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-E</option></term>
<term><option>--content-encoding</option>=<replaceable class="parameter">content coding</replaceable></term>
<listitem>
<para>
Specifies the content-encoding property for
the message. If omitted, the content-encoding
property is not set on the message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-b</option></term>
<term><option>--body</option>=<replaceable class="parameter">message body</replaceable></term>
<listitem>
<para>
Specifies the message body. If omitted, the
message body is read from standard input.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-H</option></term>
<term><option>--header</option>=<replaceable class="parameter">header</replaceable></term>
<listitem>
<para>
Specifies an optional header in the form "key: value".
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Examples</title>
<variablelist>
<varlistentry>
<term>Send a short message, consisting of the word
<quote><literal>Hello</literal></quote> to the queue
<quote><systemitem
class="resource">myqueue</systemitem></quote> via the
default exchange:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-publish -r myqueue -b Hello</userinput></screen>
</listitem>
</varlistentry>
<varlistentry>
<term>Send some XML data from a file to the exchange
<quote><systemitem
class="resource">events</systemitem></quote>, with
persistent delivery mode, setting the content-type
property on the message to make the data format
explicit:</term>
<listitem>
<screen><prompt>$ </prompt><userinput>amqp-publish -e events -p -C text/xml &lt;event.xml</userinput></screen>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<citerefentry><refentrytitle>librabbitmq-tools</refentrytitle><manvolnum>7</manvolnum></citerefentry>
describes connection-related options common to all the
RabbitMQ C Client tools.
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"
[
<!ENTITY date SYSTEM "man-date.ent" >
]
>
<refentry lang="en">
<refentryinfo>
<productname>RabbitMQ C Client</productname>
<authorgroup>
<corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
</authorgroup>
<date>&date;</date>
</refentryinfo>
<refmeta>
<refentrytitle>librabbitmq-tools</refentrytitle>
<manvolnum>7</manvolnum>
<refmiscinfo class="manual">RabbitMQ C Client</refmiscinfo>
</refmeta>
<refnamediv>
<refname>librabbitmq-tools</refname>
<refpurpose>Command line AMQP tools</refpurpose>
</refnamediv>
<refsect1>
<title>Description</title>
<para>
A set of command line AMQP tools based on <systemitem
class="library">librabbitmq</systemitem>. This page
describes common options and conventions used by all of
the tools.
</para>
</refsect1>
<refsect1>
<title>Common Options</title>
<variablelist>
<varlistentry>
<term><option>-s</option></term>
<term><option>--server</option>=<replaceable class="parameter">hostname:port</replaceable></term>
<listitem>
<para>
The host name (or address) to connect to.
Defaults to localhost. The port number may
also be specified; if omitted, it defaults to
the standard AMQP port number (5672).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--vhost</option>=<replaceable class="parameter">vhost</replaceable></term>
<listitem>
<para>
The AMQP vhost to specify when connecting.
Defaults to <literal>/</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--username</option>=<replaceable class="parameter">username</replaceable></term>
<listitem>
<para>
The username to authenticate to the AMQP server with. Defaults to <literal>guest</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--password</option>=<replaceable class="parameter">password</replaceable></term>
<listitem>
<para>
The password to authenticate to the AMQP server with. Defaults to <literal>guest</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>See also</title>
<para>
<simplelist type='inline'>
<member><citerefentry><refentrytitle>amqp-publish</refentrytitle><manvolnum>1</manvolnum></citerefentry></member>
<member><citerefentry><refentrytitle>amqp-consume</refentrytitle><manvolnum>1</manvolnum></citerefentry></member>
<member><citerefentry><refentrytitle>amqp-get</refentrytitle><manvolnum>1</manvolnum></citerefentry></member>
</simplelist>
</para>
</refsect1>
</refentry>

View file

@ -0,0 +1,78 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include "common.h"
static int do_get(amqp_connection_state_t conn, char *queue) {
amqp_rpc_reply_t r = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
die_rpc(r, "basic.get");
if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) {
return 0;
}
copy_body(conn, 1);
return 1;
}
int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
int got_something;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
{"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from",
"queue"},
POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
if (!queue) {
fprintf(stderr, "queue not specified\n");
return 1;
}
conn = make_connection();
got_something = do_get(conn, queue);
close_connection(conn);
return got_something ? 0 : 2;
}

View file

@ -0,0 +1,180 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"
#define MAX_LINE_LENGTH 1024 * 32
static void do_publish(amqp_connection_state_t conn, char *exchange,
char *routing_key, amqp_basic_properties_t *props,
amqp_bytes_t body) {
int res = amqp_basic_publish(conn, 1, cstring_bytes(exchange),
cstring_bytes(routing_key), 0, 0, props, body);
die_amqp_error(res, "basic.publish");
}
int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *exchange = NULL;
static char *routing_key = NULL;
static char *content_type = NULL;
static char *content_encoding = NULL;
static char **headers = NULL;
static char *reply_to = NULL;
static char *body = NULL;
amqp_basic_properties_t props;
amqp_bytes_t body_bytes;
static int delivery = 1; /* non-persistent by default */
static int line_buffered = 0;
static char **pos;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
{"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
"the exchange to publish to", "exchange"},
{"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
"the routing key to publish with", "routing key"},
{"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
"use the persistent delivery mode", NULL},
{"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
"the content-type for the message", "content type"},
{"reply-to", 't', POPT_ARG_STRING, &reply_to, 0,
"the replyTo to use for the message", "reply to"},
{"line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2,
"treat each line from standard in as a separate message", NULL},
{"content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0,
"the content-encoding for the message", "content encoding"},
{"header", 'H', POPT_ARG_ARGV, &headers, 0,
"set a message header (may be specified multiple times)",
"\"key: value\""},
{"body", 'b', POPT_ARG_STRING, &body, 0, "specify the message body",
"body"},
POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
if (!exchange && !routing_key) {
fprintf(stderr, "neither exchange nor routing key specified\n");
return 1;
}
memset(&props, 0, sizeof props);
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = delivery;
if (content_type) {
props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes(content_type);
}
if (content_encoding) {
props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
props.content_encoding = amqp_cstring_bytes(content_encoding);
}
if (reply_to) {
props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
props.reply_to = amqp_cstring_bytes(reply_to);
}
if (headers) {
int num = 0;
for (pos = headers; *pos; pos++) {
num++;
}
if (num > 0) {
amqp_table_t *table = &props.headers;
table->num_entries = num;
table->entries = calloc(num, sizeof(amqp_table_entry_t));
int i = 0;
for (pos = headers; *pos; pos++) {
char *colon = strchr(*pos, ':');
if (colon) {
*colon++ = '\0';
while (*colon == ' ') colon++;
table->entries[i].key = amqp_cstring_bytes(*pos);
table->entries[i].value.kind = AMQP_FIELD_KIND_UTF8;
table->entries[i].value.value.bytes = amqp_cstring_bytes(colon);
i++;
} else {
fprintf(stderr,
"Ignored header definition missing ':' delimiter in \"%s\"\n",
*pos);
}
}
props._flags |= AMQP_BASIC_HEADERS_FLAG;
}
}
conn = make_connection();
if (body) {
body_bytes = amqp_cstring_bytes(body);
} else {
if (line_buffered) {
body_bytes.bytes = (char *)malloc(MAX_LINE_LENGTH);
while (fgets(body_bytes.bytes, MAX_LINE_LENGTH, stdin)) {
body_bytes.len = strlen(body_bytes.bytes);
do_publish(conn, exchange, routing_key, &props, body_bytes);
}
} else {
body_bytes = read_all(0);
}
}
if (!line_buffered) {
do_publish(conn, exchange, routing_key, &props, body_bytes);
}
if (props.headers.num_entries > 0) {
free(props.headers.entries);
}
if (!body) {
free(body_bytes.bytes);
}
close_connection(conn);
return 0;
}

View file

@ -0,0 +1,91 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <errno.h>
#include <spawn.h>
#include <sys/wait.h>
#include <unistd.h>
#include "common.h"
#include "process.h"
extern char **environ;
void pipeline(const char *const *argv, struct pipeline *pl) {
posix_spawn_file_actions_t file_acts;
int pipefds[2];
if (pipe(pipefds)) {
die_errno(errno, "pipe");
}
die_errno(posix_spawn_file_actions_init(&file_acts),
"posix_spawn_file_actions_init");
die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
"posix_spawn_file_actions_adddup2");
die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
"posix_spawn_file_actions_addclose");
die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
"posix_spawn_file_actions_addclose");
die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
(char *const *)argv, environ),
"posix_spawnp: %s", argv[0]);
die_errno(posix_spawn_file_actions_destroy(&file_acts),
"posix_spawn_file_actions_destroy");
if (close(pipefds[0])) {
die_errno(errno, "close");
}
pl->infd = pipefds[1];
}
int finish_pipeline(struct pipeline *pl) {
int status;
if (close(pl->infd)) {
die_errno(errno, "close");
}
if (waitpid(pl->pid, &status, 0) < 0) {
die_errno(errno, "waitpid");
}
return WIFEXITED(status) && WEXITSTATUS(status) == 0;
}

View file

@ -0,0 +1,42 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
struct pipeline {
int pid;
int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);
extern int finish_pipeline(struct pipeline *pl);

View file

@ -0,0 +1,65 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include "compat.h"
int asprintf(char **strp, const char *fmt, ...) {
va_list ap;
int len;
va_start(ap, fmt);
len = _vscprintf(fmt, ap);
va_end(ap);
*strp = malloc(len + 1);
if (!*strp) {
return -1;
}
va_start(ap, fmt);
_vsnprintf(*strp, len + 1, fmt, ap);
va_end(ap);
(*strp)[len] = 0;
return len;
}

View file

@ -0,0 +1,36 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
extern int asprintf(char **strp, const char *fmt, ...);

View file

@ -0,0 +1,227 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <io.h>
#include <stdio.h>
#include <windows.h>
#include "common.h"
#include "process.h"
void die_windows_error(const char *fmt, ...) {
char *msg;
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
if (!FormatMessage(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPSTR)&msg, 0, NULL)) {
msg = "(failed to retrieve Windows error message)";
}
fprintf(stderr, ": %s\n", msg);
exit(1);
}
static char *make_command_line(const char *const *argv) {
int i;
size_t len = 1; /* initial quotes */
char *buf;
char *dest;
/* calculate the length of the required buffer, making worst
case assumptions for simplicity */
for (i = 0;;) {
/* each character could need escaping */
len += strlen(argv[i]) * 2;
if (!argv[++i]) {
break;
}
len += 3; /* quotes, space, quotes */
}
len += 2; /* final quotes and the terminating zero */
dest = buf = malloc(len);
if (!buf) {
die("allocating memory for subprocess command line");
}
/* Here we perform the inverse of the CommandLineToArgvW
function. Note that its rules are slightly crazy: A
sequence of backslashes only act to escape if followed by
double quotes. A sequence of backslashes not followed by
double quotes is untouched. */
for (i = 0;;) {
const char *src = argv[i];
int backslashes = 0;
*dest++ = '\"';
for (;;) {
switch (*src) {
case 0:
goto done;
case '\"':
for (; backslashes; backslashes--) {
*dest++ = '\\';
}
*dest++ = '\\';
*dest++ = '\"';
break;
case '\\':
backslashes++;
*dest++ = '\\';
break;
default:
backslashes = 0;
*dest++ = *src;
break;
}
src++;
}
done:
for (; backslashes; backslashes--) {
*dest++ = '\\';
}
*dest++ = '\"';
if (!argv[++i]) {
break;
}
*dest++ = ' ';
}
*dest++ = 0;
return buf;
}
void pipeline(const char *const *argv, struct pipeline *pl) {
HANDLE in_read_handle, in_write_handle;
SECURITY_ATTRIBUTES sec_attr;
PROCESS_INFORMATION proc_info;
STARTUPINFO start_info;
char *cmdline = make_command_line(argv);
sec_attr.nLength = sizeof sec_attr;
sec_attr.bInheritHandle = TRUE;
sec_attr.lpSecurityDescriptor = NULL;
if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) {
die_windows_error("CreatePipe");
}
if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) {
die_windows_error("SetHandleInformation");
}
/* when in Rome... */
ZeroMemory(&proc_info, sizeof proc_info);
ZeroMemory(&start_info, sizeof start_info);
start_info.cb = sizeof start_info;
start_info.dwFlags |= STARTF_USESTDHANDLES;
if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) ==
INVALID_HANDLE_VALUE ||
(start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) ==
INVALID_HANDLE_VALUE) {
die_windows_error("GetStdHandle");
}
start_info.hStdInput = in_read_handle;
if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL,
&start_info, &proc_info)) {
die_windows_error("CreateProcess");
}
free(cmdline);
if (!CloseHandle(proc_info.hThread)) {
die_windows_error("CloseHandle for thread");
}
if (!CloseHandle(in_read_handle)) {
die_windows_error("CloseHandle");
}
pl->proc_handle = proc_info.hProcess;
pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
}
int finish_pipeline(struct pipeline *pl) {
DWORD code;
if (close(pl->infd)) {
die_errno(errno, "close");
}
for (;;) {
if (!GetExitCodeProcess(pl->proc_handle, &code)) {
die_windows_error("GetExitCodeProcess");
}
if (code != STILL_ACTIVE) {
break;
}
if (WaitForSingleObject(pl->proc_handle, INFINITE) == WAIT_FAILED) {
die_windows_error("WaitForSingleObject");
}
}
if (!CloseHandle(pl->proc_handle)) {
die_windows_error("CloseHandle for process");
}
return code;
}

View file

@ -0,0 +1,44 @@
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#include <windef.h>
struct pipeline {
HANDLE proc_handle;
int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);
extern int finish_pipeline(struct pipeline *pl);