Start or cancel a Consumer for a given queue. Consumers attach a callback function to the queue that is run once for each message received (until it is cancelled). Any number of consumers can be started on a given connection.
Because R is single-threaded, you must call amqp_listen()
to actually
receive and process messages. As an alternative, you can consume messages on
a background thread by using amqp_consume_later
.
amqp_consume(conn, queue, fun, tag = "", no_ack = FALSE, exclusive = FALSE, requeue_on_error = FALSE, prefetch_count = 50, ...) amqp_cancel_consumer(consumer) amqp_listen(conn, timeout = 10L) amqp_nack(requeue = FALSE)
conn | An object returned by |
---|---|
queue | The name of a queue. |
fun | A function taking a single parameter, the message received. This
function is executed by |
tag | An optional "tag" to identify the consumer. When empty, the server will generate one automatically. |
no_ack | When |
exclusive | When |
requeue_on_error | When |
prefetch_count | The maximum number of messages to "prefetch" from the
queue. Use |
... | Additional arguments, used to declare broker-specific AMQP extensions. See Details. |
consumer | An object created by |
timeout | Maximum number of seconds to wait for messages. Capped at 60. |
requeue | When |
amqp_consume
returns an "amqp_consumer"
object, which can
later be used to cancel the consumer. Keep in mind that if you do not assign
the result of this function to a variable, you will have no way of
cancelling the consumer directly -- instead, you will be relying on
gc
to take care of this at some indeterminate point in
the future.
Additional arguments can be used to declare broker-specific extensions. An incomplete list is as follows:
"x-priority"
Specify a consumer priority.
Unless no_ack
is TRUE
, messages are acknowledged automatically
after the callback executes. If it fails, messages are nacked instead before
surfacing the underlying error to the caller. amqp_nack()
can be used
instead to manually signal that a message should be nacked and control the
redelivery behaviour.
amqp_get
to get messages individually or
amqp_consume_later
to consume messages in a background thread.
if (FALSE) { # Create a consumer. conn <- amqp_connect() queue <- amqp_declare_tmp_queue(conn) consumer <- amqp_consume(conn, queue, function(msg) { print(msg) }) # Publish and then listen for a message. amqp_publish(conn, "Hello, world.", routing_key = queue) amqp_listen(conn, timeout = 1) # Clean up. amqp_cancel_consumer(consumer) amqp_disconnect(conn) }