Consume messages "asynchronously" by using the machinery of the later package. This function is primarily for use inside applications (particularly Shiny applications) that already make use of later to manage events.
This interface is experimental and should be used with caution, since any bugs in the implementation have the potential to cause serious memory corruption issues that will terminate the R process.
amqp_consume_later(conn, queue, fun, tag = "", no_ack = FALSE, exclusive = FALSE, prefetch_count = 50, ...)
conn | An object returned by |
---|---|
queue | The name of a queue. |
fun | A function taking a single parameter, the message received. This
function is executed by |
tag | An optional "tag" to identify the consumer. When empty, the server will generate one automatically. |
no_ack | When |
exclusive | When |
prefetch_count | The maximum number of messages to "prefetch" from the
queue. Use |
... | Additional arguments, used to declare broker-specific AMQP extensions. See Details. |
An amqp_connection
object will start a "background thread" for these
consumers if any are declared. Because underlying components of the
amqp_connection
object are not thread-safe, this background thread
creates a "clone" of the original connection using the same properties. This
may lead to some surprising results, including the fact that consumers
created with this interface will not stop running if the original connection
is closed with amqp_disconnect
or due to connection-level user errors.
This may change in future versions.
At present, consumers can only be cancelled by using
amqp_cancel_consumer
or by garbage collection when the original
connection object expires.
Messages to background consumers are always acknowledged.
amqp_consume
to consume messages in the main thread.