Start or cancel a Consumer for a given queue. Consumers attach a callback function to the queue that is run once for each message received (until it is cancelled). Any number of consumers can be started on a given connection.

Because R is single-threaded, you must call amqp_listen() to actually receive and process messages. As an alternative, you can consume messages on a background thread by using amqp_consume_later.

amqp_consume(conn, queue, fun, tag = "", no_ack = FALSE,
  exclusive = FALSE, requeue_on_error = FALSE, prefetch_count = 50, ...)

amqp_cancel_consumer(consumer)

amqp_listen(conn, timeout = 10L)

amqp_nack(requeue = FALSE)

Arguments

conn

An object returned by amqp_connect.

queue

The name of a queue.

fun

A function taking a single parameter, the message received. This function is executed by amqp_listen() whenever messages are received on the queue.

tag

An optional "tag" to identify the consumer. When empty, the server will generate one automatically.

no_ack

When TRUE, tell the server not to expect that messages will be acknowledged.

exclusive

When TRUE, request that this consumer has exclusive access to the queue.

requeue_on_error

When TRUE, errors in fun will cause the message in question to be redelivered on the queue by the server. This is advisable only when you expect that another consumer will be able to handle the same message without issue.

prefetch_count

The maximum number of messages to "prefetch" from the queue. Use 1 to implement true round-robin delivery to multiple consumers.

...

Additional arguments, used to declare broker-specific AMQP extensions. See Details.

consumer

An object created by amqp_consume.

timeout

Maximum number of seconds to wait for messages. Capped at 60.

requeue

When TRUE, redeliver the message on the queue.

Value

amqp_consume returns an "amqp_consumer" object, which can later be used to cancel the consumer. Keep in mind that if you do not assign the result of this function to a variable, you will have no way of cancelling the consumer directly -- instead, you will be relying on gc to take care of this at some indeterminate point in the future.

Details

Additional arguments can be used to declare broker-specific extensions. An incomplete list is as follows:

"x-priority"

Specify a consumer priority.

Unless no_ack is TRUE, messages are acknowledged automatically after the callback executes. If it fails, messages are nacked instead before surfacing the underlying error to the caller. amqp_nack() can be used instead to manually signal that a message should be nacked and control the redelivery behaviour.

See also

amqp_get to get messages individually or amqp_consume_later to consume messages in a background thread.

Examples

if (FALSE) {
# Create a consumer.
conn <- amqp_connect()
queue <- amqp_declare_tmp_queue(conn)
consumer <- amqp_consume(conn, queue, function(msg) {
  print(msg)
})

# Publish and then listen for a message.
amqp_publish(conn, "Hello, world.", routing_key = queue)
amqp_listen(conn, timeout = 1)

# Clean up.
amqp_cancel_consumer(consumer)
amqp_disconnect(conn)
}