Consume messages "asynchronously" by using the machinery of the later package. This function is primarily for use inside applications (particularly Shiny applications) that already make use of later to manage events.

This interface is experimental and should be used with caution, since any bugs in the implementation have the potential to cause serious memory corruption issues that will terminate the R process.

amqp_consume_later(conn, queue, fun, tag = "", no_ack = FALSE,
  exclusive = FALSE, prefetch_count = 50, ...)

Arguments

conn

An object returned by amqp_connect, but see Details.

queue

The name of a queue.

fun

A function taking a single parameter, the message received. This function is executed by later whenever messages are received on the queue.

tag

An optional "tag" to identify the consumer. When empty, the server will generate one automatically.

no_ack

When TRUE, tell the server not to expect that messages will be acknowledged.

exclusive

When TRUE, request that this consumer has exclusive access to the queue.

prefetch_count

The maximum number of messages to "prefetch" from the queue. Use 1 to implement true round-robin delivery to multiple consumers.

...

Additional arguments, used to declare broker-specific AMQP extensions. See Details.

Details

An amqp_connection object will start a "background thread" for these consumers if any are declared. Because underlying components of the amqp_connection object are not thread-safe, this background thread creates a "clone" of the original connection using the same properties. This may lead to some surprising results, including the fact that consumers created with this interface will not stop running if the original connection is closed with amqp_disconnect or due to connection-level user errors. This may change in future versions.

At present, consumers can only be cancelled by using amqp_cancel_consumer or by garbage collection when the original connection object expires.

Messages to background consumers are always acknowledged.

See also

amqp_consume to consume messages in the main thread.