longears is a fast and fully-featured RabbitMQ client for R, built on top of the reference C library, librabbitmq.

RabbitMQ itself is a highly popular, performant, and robust open-source message broker used to build distributed systems.

longears implements a large portion of the Advanced Message Queuing Protocol (AMQP) used by RabbitMQ , and the API largely reflects the protocol itself. However, this package is not a true low-level interface: it abstracts away many details of AMQP for end users. See Limitations for details.

This package may be of interest to you if you wish to have R speak to your organization’s existing RabbitMQ servers; if you simply need a message queue library, you may be better off with txtq, litq, or the ZeroMQ package rzmq.

Installation

To install longears as a source package you will need its system dependency, librabbitmq. Normally if this library is missing during installation, the configure script will attempt to download and build it directly (provided you have cmake installed).

You may still wish to use your platform’s provided librabbitmq instead, especially if this process fails. On Debian-based systems (including Ubuntu) you can get this library by running the following from the command line:

$ apt install librabbitmq-dev

For other platforms:

  • macOS (via Homebrew): brew install rabbitmq-c
  • Fedora-based: yum install librabbitmq-devel
  • Arch-based: pacman -S librabbitmq-c
  • Windows (via Rtools): pacman -Sy mingw-w64-{i686,x86_64}-rabbitmq-c

longears is not yet available on CRAN. You can install it from GitHub with

# install.packages("remotes")
remotes::install_github("atheriel/longears")

Basic Usage

If you are not already familiar with RabbitMQ and the message/queue/binding/exchange terminology, I suggest their excellent conceptual overview.

You will also need to have a local RabbitMQ server running with the default settings to follow this guide.

$ # apt install rabbitmq-server
$ systemctl start rabbitmq-server
$ rabbitmqctl status

First, connect to the server (with the default settings):

conn <- amqp_connect()
conn
#> AMQP Connection:
#>   status:  connected
#>   address: localhost:5672
#>   vhost:   '/'

Create an exchange to route messages and a couple of queues to store them:

amqp_declare_exchange(conn, "my.exchange", type = "fanout")
amqp_declare_queue(conn, "my.queue1")
#> AMQP queue 'my.queue1'
#>   messages:  0
#>   consumers: 0
amqp_declare_queue(conn, "my.queue2")
#> AMQP queue 'my.queue2'
#>   messages:  0
#>   consumers: 0
amqp_bind_queue(conn, "my.queue1", "my.exchange", routing_key = "#")
amqp_bind_queue(conn, "my.queue2", "my.exchange", routing_key = "#")

You can also set up a consumer for one of these queues with a callback:

received <- 0
consumer <- amqp_consume(conn, "my.queue2", function(msg) {
  received <<- received + 1
})

Now, send a few messages to this exchange:

amqp_publish(conn, "first", exchange = "my.exchange", routing_key = "#")
amqp_publish(conn, "second", exchange = "my.exchange", routing_key = "#")

Check if your messages are going into the queues:

$ rabbitmqctl list_queues

You can use amqp_get() to pull individual messages back into R:

amqp_get(conn, "my.queue1")
#> Delivery Tag:    1
#> Redelivered: FALSE
#> Exchange:    my.exchange
#> Routing Key: #
#> Message Count:   1
#> 66 69 72 73 74
amqp_get(conn, "my.queue1")
#> Delivery Tag:    2
#> Redelivered: FALSE
#> Exchange:    my.exchange
#> Routing Key: #
#> Message Count:   0
#> 73 65 63 6f 6e 64
amqp_get(conn, "my.queue1")
#> character(0)

Or you can use amqp_listen() to run consumer callbacks:

amqp_listen(conn, timeout = 1)
received
#> [1] 2

To clean things up, delete the queues, the exchange, and disconnect from the server.

amqp_delete_queue(conn, "my.queue1")
amqp_delete_queue(conn, "my.queue2")
amqp_delete_exchange(conn, "my.exchange")
amqp_disconnect(conn)
conn
#> AMQP Connection:
#>   status:  disconnected
#>   address: localhost:5672
#>   vhost:   '/'

And check that the connection is closed:

$ rabbitmqctl list_connections

Limitations

Some AMQP features are not present, notably transaction support (which there are no plans to implement). Others are handled internally according to best practices, even when they are not exposed through the API – channels, message acknowledgements, and prefetch counts are in this category. A design goal of the package is to shield users from some details, especially if departures from best practice are rare.

If you have need of an AMQP feature (or RabbitMQ extension) that is not currently available or accessible, please consider filing an issue explaining the use case you have in mind.

License

The package is licensed under the GPL, version 2 or later.