Reusing existing MQ infrastructure


(Mike Hearn) #1

One question that has been cropping up lately is whether Corda will support the re-use of existing institutional message queue infrastructures. This is an interesting question because MQ sits at the heart of many financial institutions and as such running MQ brokers is typically a core competency of existing ops teams.

Background

We often talk about how Corda is designed by the financial industry, for the financial industry. This isn’t just glib marketing, we mean it and the topic of MQ is a great example. We designed Corda from the very start to be fundamentally based around message queues. This stands in contrast to all the other block chain/DLT platforms out there (Bitcoin, Ethereum, Fabric etc) which use either simple framed network connections or, in Fabric’s case, gRPC.

At the heart of every Corda node sits an MQ broker. Today this broker is Apache Artemis, the successor to the popular ActiveMQ product. Note that Artemis is not ActiveMQ despite its often confusing branding - it is a separate codebase that used to be known as HornetMQ.

The Corda MQ broker handles the following tasks:

  • Reliable communication over the P2P network using AMQP/1.0
    • Storage and (fully transactional) retry of messages that are destined for offline peers.
    • Setting up TLS connections and performing mutual client/server certificate authentication.
    • Identity based routing. You send messages to legal identities and the Corda network map infrastructure ensures the broker is always configured to send messages to the right IP address, even in cases where the IP address for a node changes half way through a conversation.
  • Accepting and authenticating CordaRPC connections:
    • CordaRPC is our MQ based RPC protocol which provides full support for reactive programming: the system can transport ReactiveX Observables and Futures, with push messages being handled by the MQ broker.
    • Transparent re-establishment of RPC connections in case of IP address changes or network outages (useful for end user apps).
    • CordaRPC makes it easy to link push data streams to JavaFX reactive collections, enabling full reactive/functional UI across the entire stack. Our Explorer app shows how to do this.
  • Routing of messages between different node components when it’s possible for those components to run out of process. For instance we are in the middle of splitting transaction verification out into a separate process.

Being a full blown broker, it also lets you connect and create your own queues and addresses. Artemis currently stores its messaging journals in a custom file format, but it also supports message storage in any relational database and we might switch to using that in future.

We don’t currently utilise all the capabilities MQ brokers provide and we plan to go further in future:

  • Allowing flows to send and receive on arbitrary queues not just P2P nodes. In this way, ledger update workflows can be naturally integrated with:
    • In-house systems that already exist and are available over MQ
    • People (one ticket queue per person, like an inbox)
    • Other financial networks that “speak async messages”.
  • Backpressure for both P2P and RPC traffic. If a remote peer or a connected RPC client is either sending traffic too fast, or receiving traffic too slow, this will cause pressure to propagate through the node and network to the originating source, requesting it to slow down. Although we are currently missing some small pieces of code to wire it together, Corda is carefully designed to fully support backpressure in future. Backpressure frees the developer from worrying about accidentally flooding a server with data (or being accidentally flooded), and thus lets you write code in a simple naturalistic way in which you simply go as fast as possible and rely on the blocking nature of the stack to slow you down. Of course if you don’t want that sort of backpressure full async messaging is also possible.
  • Able to cluster and shard the MQ broker for huge message routing performance.
  • Automatic message sharding to allow node sub-components to scale up and down.
  • High availability features like automatic client failover between clusters.
  • Large file streaming, so enormous files can be attached to transactions and streamed through RPC and then across the network automatically. For example, one potential customer we’re talking to wishes to use this feature for trader surveillance data, like by attaching audio recordings of phone calls to the resulting Corda transactions thus ensuring they become a signed and immutable part of the ledger. Doing this requires the ability to stream files larger than available RAM over the network in chunks.
  • Integration with LDAP and other forms of corporate SSO infrastructure for client authentication.
  • Non-RPC based posting of data structures representing events in various formats to ordinary message queues, for cases where you don’t wish to use our RPC library for whatever reason.

Whew! That’s a lot of stuff! Fortunately Artemis supports all the necessary features and is well documented. It’s also professionally maintained by Red Hat.

But of course, Artemis isn’t the only product that supports these things and so people would naturally like to use their existing brokers.

Integration with other MQ brokers

There are several sub-problems to solve in order to make this possible. In no particular order:

  1. We customise Artemis to ensure that mutually authenticated TLS with our own PKI is used, and that the identities from the certificates are attached to every message. Institutional MQ typically runs inside the firewall and is not used for inter-organisational P2P connectivity.
  2. We use Artemis specific APIs rather than JMS.
  3. We use Artemis specific properties for things like knowing which authenticated RPC user sent a message.
  4. We only unit and regression test with Artemis.

Let’s tackle them in order.

P2P authentication and internet exposure

Corda is designed as a business-to-business network that runs over the internet. Whilst we are looking at providing alternative models for users who want them, and I’ll cover that shortly, in its “native” mode you need the ability to accept inbound connections and build outbound connections. Additionally, Corda nodes need to both communicate with the outside world and internal systems like databases simultaneously.

When people say they want to use their existing MQ broker, they usually don’t mean they want to expose their mission critical MQ infrastructure outside the firewall. They mean they want to use it for other things, like maybe RPC, receiving notifications of events, gluing disparate systems together and so on.

Given the customisation required to Artemis to make it do what we use it for, and given that we will eventually be wanting to audit it to ensure it’s safe for potentially malicious peers to connect, it makes sense to keep it around for use outside the firewall even in a world where the bulk of the work is handled by other MQ brokers. In this design, we allow Artemis to “float” outside the firewall and main node connects to it outbound through the firewall. Messages that are destined for a peer on the network are generated inside the node and sent through the firewall to the float, which then takes care of getting it delivered to the right endpoint. The float would continue to be an embedded Artemis and would handle only the following:

  • Mapping of identities to IP addresses via the network map.
  • Mutual TLS authentication using the network’s root CA.
  • Storage and retry of messages.

One question is to what extent a compromised float should be a part of the threat model (R3 consortium members should have access to the current threat model - we will probably publish it externally soon). A compromised float would be able to forge non-transaction messages at will in the current code, because only ledger transactions are signed. Additional layers of encryption and signing can be added to reduce the impact of a compromised float but, of course, only with additional performance and code complexity overhead.

Internal MQ usage

Inside the firewall the node would then connect not only to the float, but also to your existing broker via JMS. Such an “enterprise node” would allow its key functions to be running in standalone processes that link back to the coordinator machine via that internal MQ. RPC connections, bridges to internal systems and routing of work between sub-components of the node (e.g. flow executors) would then flow over the internal MQ rather than Artemis. Only once a message was destined for the outside world would it head through the firewall and onto the float.

The internal node would also continuously pop messages off the float queues and push them onto the internal MQ broker, waiting for other node subcomponents to pick them up.

Your internal MQ would need some adjustments to work with Corda in this way: in particular it would need the concept of a “node internal” user to be added so the node sub-components can mutually authenticate each other. And it would need to ensure that messages are tagged with authenticated user info in a compatible way.

Availability considerations

If the float goes offline then messages will start to back up in two places:

  1. Your internal MQ system
  2. The queues on remote nodes

No messages will be lost. When the float comes back the internal node will start delivering outbound messages to it through the firewall, and remote nodes will eventually notice it’s come back due to their periodic retries and start delivering messages that previously failed.

The signed network parameters structure for the Corda network you’re connecting to defines the event horizon - the amount of time you can be offline before you are automatically evicted from the network and nodes are allowed to flush their queues.

If your internal node goes down then messages from remote peers will start to back up in the float queues.

If both internal node and float go offline then it is the same as if the float goes offline.

If your internal MQ goes offline then it is the job of the internal node to immediately stop pulling messages from the float, so they start backing up outside the firewall. The act of popping a message from the float and pushing it onto the internal MQ should be transactional and deduplicated (Corda already configures Artemis message deduplication but this property must be preserved as messages cross onto a different broker).

If the float then runs out of storage capacity it would take itself offline, thus causing messages to back up on remote peers.

API ports

We would need to schedule time into our dev roadmap for porting of Corda RPC and internal node comms to JMS and away from Artemis specific APIs. This may be tricky in a few cases where we use features like transient queues that may not have a direct JMS equivalent (I’m not a JMS expert so I don’t really know what I’m talking about here).

Testing and qualification

Once R3 begins providing commercial support for Corda, I doubt we’d be willing to provide support for just any MQ broker used in the above fashion. We’d want to develop robustness and security tests along with bridge code for each broker, so we can be sure we know that the complex interplay works and that advanced features like backpressure aren’t subtly broken by it (as you will inevitably only find out about important missing MQ features when suddenly you need them and everything flames out).

Conclusion

Support for in-house MQ infrastructures is a worthy goal that we are considering. The decision to base Corda on MQ technology right from day one assists greatly with this and the amount of work involved is not intractable.

Whilst the code is of course currently immature, we believe Corda developers will enjoy working with a fully reactive, scalable, push based architecture that frees you to do things that wouldn’t be possible on other platforms.


(Jose Coll) #2

Some JMS stuff

The equivalent of transient queues in JMS are temporary destinations (either queues or topics), which have a lifetime associated with the connection that created them, and whose messages may only be consumed within that connection (and, are therefore non-durable and unreliable by nature). There purpose is often to provide Request/Reply semantics (using the JMSReplyTo property).

An important consideration in supporting JMS is whether a messaging provider supports JMS 2.0 (some still do not). Of specific interest are the following features of JMS 2.0

  • asynchronous send semantic (many messaging providers already supported this natively in their API’s)
  • support for shared topic subscriptions (both durable and non-durable) to aid scalability

Many messaging providers extend the JMS specification with additional features (often specified as flags, properties or configuration options when using their JMS API). For example, many define additional message acknowledgement modes, which often have subtle but implementation specific behaviours (often to enhance performance at the expense of reliability). JMS defines three modes:

  • DUPS_OK_ACKNOWLEDGE
  • AUTO_ACKNOWLEDGE
  • CLIENT_ACKNOWLEDGE

All of these standard JMS modes involve sending acknowledgements from the client to the server. However in some cases, you really don’t mind losing messages in event of failure, so it would make sense to acknowledge the message on the server before delivering it to the client. Artemis supports two additional modes: PRE_ACKNOWLEDGE, INDIVIDUAL_ACKNOWLEDGE. With pre-acknowledge mode, messages are acknowledged before they are sent to the client. This reduces the amount of acknowledgement traffic on the wire.

Other examples of JMS enhanced or variant behaviour are:

  • usage of wildcarding in topic namespace selectors (different providers allow different types of namespace definition and search in selectors)
  • usage of custom properties in JMS message to provide additional functionality. For example, Artemis defines a custom message property called HDR_DUPLICATE_DETECTION_ID which is used by a server using its on memory cache to identify duplicate messages.

Finally, it is important to emphasize that JMS does not define a standard wire protocol, it only defines a common programming API. This enables pluggability of messaging providers with minimal programmatic code change but does not enable interoperability between provider implementations (unless these also implement a standard wire protocol such as AMQP, STOMP)


(George P Burdell) #3

Thank you for these thoughts on internal messaging vs external messaging.

  1. Is there a sample implementation of this “float” in release-M10?

  2. Does it make sense to host a Corda node inside the DMZ, thereby allowing it to interact with an external MQ on the one hand, and connect with a database behind the firewall on the other?

  3. for P2P communication among nodes outside the firewall, does it make sense to support messaging that are not based on MQ? especially for non-trading use cases where high transaction throughput is not as important, does it make sense to support other protocols via this “float” such as MQTT or even something as unsophisticated as WebHook?


(Wawrzyniec (Wawrzek) Niewodniczański) #4

Does it mean that there are going to be 2 ways of communication inside Corda Network.

  1. Node to Node is going to talk using Artemis MQ (over internet).
  2. Inside node “Main Node” (inside firewall) is going to talk to the “Float” part (outside firewall) as well as existing internal MQ broker and other internal services via JMS based, internal MQ . The same MQ is going to use for other internal communication.

I tried to sketch MQ usage on this diagram. Is it correct?
https://www.lucidchart.com/invitations/accept/9ce8c761-cc4f-4aa3-acb3-6ad11573c9b9


(Mike Hearn) #5

Node to node is AMQP/1.0 over TLS. It doesn’t have to be Artemis doing the communication, just so happens that’s what we picked.

I couldn’t accept the invite unfortunately. I got stuck in some signup loop. Maybe you can paste the diagram here?


(Wawrzyniec (Wawrzek) Niewodniczański) #6


(Mike Hearn) #7

I think that’s not quite right. The components of a node would talk to each other via the internal MQ engine.


(Wawrzyniec (Wawrzek) Niewodniczański) #8

@mike - That was the plan - I might need to better represent it. I’m still not sure how other internal system are going to talk to a Corda node. Is existing MQ broker may use Corda Internal MQ (CIMQ) engine directly?


(Mike Hearn) #9

The idea is that Corda won’t use its own internal MQ anymore in a distributed node. All the parts of the node would communicate via the organisations own MQ engine, and apps would communicate with the node via that engine too.


(Mani Pillai) #10

While ReactiveX/Observables simplifies coding, the issue we face is transaction reliability. If a downstream service is unavailable, the RPC client risks losing the data from Observables. With JMS client acknowledge feature, JMS data will not be acknowledged until downstream services become available. We would much rather directly use JMS/MQ to interact with corda nodes as is the case with trading platforms.


(Mike Hearn) #11

You can certainly do that.

When you say “downstream service” are you referring to the RPC client or the RPC server here? The idea is that if the node is pushing items into an observable, they are in the MQ right up to the point that they’ve been downloaded into the client and sent through the observable.

The current RPC code does actually ACK a message from the broker before publishing it to the observable - arguably we should change that so that when the observable has synchronous subscribers, the message is only ACKd once they complete without throwing an exception.

Do you see a reasonable way to integrate controlled message acknowledgement and transactionality with the ReactiveX framework?


(Mani Pillai) #12

I meant a bank’s trading system services as downstream services. While synchronous subscribers on observable might work, I am not sure about futures though ? (startFlowDynamic returns a future). Besides we are designing a solution with potentially dozens of flows and managing observables and futures in several parts of the code is going to be an arduous task ( it could be because we are just beginners when it comes to ReactiveX framework ).
A very similar issue arises with blockchains that use gRPC.


(Mike Hearn) #13

Futures are implemented internally using observables. At any rate, the RPC thread would run any callbacks on a ListenableFuture before acknowledging the message (once we fix the ordering).

You don’t have to use the observables. You can just throw them away if you don’t want them (we’re changing the API so you will have to request them explicitly).


Corda Enterprise high availability design doc