longears is a fast and fully-featured RabbitMQ client for R, built on top of the reference C library, librabbitmq
.
RabbitMQ itself is a highly popular, performant, and robust open-source message broker used to build distributed systems.
longears implements a large portion of the Advanced Message Queuing Protocol (AMQP) used by RabbitMQ , and the API largely reflects the protocol itself. However, this package is not a true low-level interface: it abstracts away many details of AMQP for end users. See Limitations for details.
This package may be of interest to you if you wish to have R speak to your organization’s existing RabbitMQ servers; if you simply need a message queue library, you may be better off with txtq, litq, or the ZeroMQ package rzmq.
To install longears as a source package you will need its system dependency, librabbitmq
. Normally if this library is missing during installation, the configure
script will attempt to download and build it directly (provided you have cmake
installed).
You may still wish to use your platform’s provided librabbitmq
instead, especially if this process fails. On Debian-based systems (including Ubuntu) you can get this library by running the following from the command line:
$ apt install librabbitmq-dev
For other platforms:
brew install rabbitmq-c
yum install librabbitmq-devel
pacman -S librabbitmq-c
pacman -Sy mingw-w64-{i686,x86_64}-rabbitmq-c
longears is not yet available on CRAN. You can install it from GitHub with
# install.packages("remotes")
remotes::install_github("atheriel/longears")
If you are not already familiar with RabbitMQ and the message/queue/binding/exchange terminology, I suggest their excellent conceptual overview.
You will also need to have a local RabbitMQ server running with the default settings to follow this guide.
$ # apt install rabbitmq-server
$ systemctl start rabbitmq-server
$ rabbitmqctl status
First, connect to the server (with the default settings):
conn <- amqp_connect()
conn
#> AMQP Connection:
#> status: connected
#> address: localhost:5672
#> vhost: '/'
Create an exchange to route messages and a couple of queues to store them:
amqp_declare_exchange(conn, "my.exchange", type = "fanout")
amqp_declare_queue(conn, "my.queue1")
#> AMQP queue 'my.queue1'
#> messages: 0
#> consumers: 0
amqp_declare_queue(conn, "my.queue2")
#> AMQP queue 'my.queue2'
#> messages: 0
#> consumers: 0
amqp_bind_queue(conn, "my.queue1", "my.exchange", routing_key = "#")
amqp_bind_queue(conn, "my.queue2", "my.exchange", routing_key = "#")
You can also set up a consumer for one of these queues with a callback:
received <- 0
consumer <- amqp_consume(conn, "my.queue2", function(msg) {
received <<- received + 1
})
Now, send a few messages to this exchange:
amqp_publish(conn, "first", exchange = "my.exchange", routing_key = "#")
amqp_publish(conn, "second", exchange = "my.exchange", routing_key = "#")
Check if your messages are going into the queues:
$ rabbitmqctl list_queues
You can use amqp_get()
to pull individual messages back into R:
amqp_get(conn, "my.queue1")
#> Delivery Tag: 1
#> Redelivered: FALSE
#> Exchange: my.exchange
#> Routing Key: #
#> Message Count: 1
#> 66 69 72 73 74
amqp_get(conn, "my.queue1")
#> Delivery Tag: 2
#> Redelivered: FALSE
#> Exchange: my.exchange
#> Routing Key: #
#> Message Count: 0
#> 73 65 63 6f 6e 64
amqp_get(conn, "my.queue1")
#> character(0)
Or you can use amqp_listen()
to run consumer callbacks:
amqp_listen(conn, timeout = 1)
received
#> [1] 2
To clean things up, delete the queues, the exchange, and disconnect from the server.
amqp_delete_queue(conn, "my.queue1")
amqp_delete_queue(conn, "my.queue2")
amqp_delete_exchange(conn, "my.exchange")
amqp_disconnect(conn)
conn
#> AMQP Connection:
#> status: disconnected
#> address: localhost:5672
#> vhost: '/'
And check that the connection is closed:
$ rabbitmqctl list_connections
Some AMQP features are not present, notably transaction support (which there are no plans to implement). Others are handled internally according to best practices, even when they are not exposed through the API – channels, message acknowledgements, and prefetch counts are in this category. A design goal of the package is to shield users from some details, especially if departures from best practice are rare.
If you have need of an AMQP feature (or RabbitMQ extension) that is not currently available or accessible, please consider filing an issue explaining the use case you have in mind.