Start or cancel a Consumer for a given queue. Consumers attach a callback function to the queue that is run once for each message received (until it is cancelled). Any number of consumers can be started on a given connection.
Because R is single-threaded, you must call amqp_listen() to actually
receive and process messages. As an alternative, you can consume messages on
a background thread by using amqp_consume_later.
amqp_consume(conn, queue, fun, tag = "", no_ack = FALSE, exclusive = FALSE, requeue_on_error = FALSE, prefetch_count = 50, ...) amqp_cancel_consumer(consumer) amqp_listen(conn, timeout = 10L) amqp_nack(requeue = FALSE)
| conn | An object returned by   | 
    
|---|---|
| queue | The name of a queue.  | 
    
| fun | A function taking a single parameter, the message received. This
function is executed by   | 
    
| tag | An optional "tag" to identify the consumer. When empty, the server will generate one automatically.  | 
    
| no_ack | When   | 
    
| exclusive | When   | 
    
| requeue_on_error | When   | 
    
| prefetch_count | The maximum number of messages to "prefetch" from the
queue. Use   | 
    
| ... | Additional arguments, used to declare broker-specific AMQP extensions. See Details.  | 
    
| consumer | An object created by   | 
    
| timeout | Maximum number of seconds to wait for messages. Capped at 60.  | 
    
| requeue | When   | 
    
amqp_consume returns an "amqp_consumer" object, which can
later be used to cancel the consumer. Keep in mind that if you do not assign
the result of this function to a variable, you will have no way of
cancelling the consumer directly -- instead, you will be relying on
gc to take care of this at some indeterminate point in
the future.
Additional arguments can be used to declare broker-specific extensions. An incomplete list is as follows:
"x-priority"Specify a consumer priority.
Unless no_ack is TRUE, messages are acknowledged automatically
after the callback executes. If it fails, messages are nacked instead before
surfacing the underlying error to the caller. amqp_nack() can be used
instead to manually signal that a message should be nacked and control the
redelivery behaviour.
amqp_get to get messages individually or
amqp_consume_later to consume messages in a background thread.
if (FALSE) { # Create a consumer. conn <- amqp_connect() queue <- amqp_declare_tmp_queue(conn) consumer <- amqp_consume(conn, queue, function(msg) { print(msg) }) # Publish and then listen for a message. amqp_publish(conn, "Hello, world.", routing_key = queue) amqp_listen(conn, timeout = 1) # Clean up. amqp_cancel_consumer(consumer) amqp_disconnect(conn) }