Martin Sústrik
This talk was delivered at Linux Kongress, September 23rd, 2010.
Abstract
This article argues that aside of TCP/IP (interlinked network hosts) and world-wide web (interlinked documents) we need a simple general-purpose decentralised infrastructure to interlink services.
We will argue that the technical difficulty of connecting service providers and service consumers in a distributed fashion can be solved, and we will explain how we propose to solve it. We will present ØMQ, which is a simple messaging library focused on this goal.
Using ØMQ, we will demonstrate how large-scale distributed services can be built with basically zero friction. We will introduce new reusable patterns for service interconnect that are intended to become the building blocks for a new "scalability layer" in the Internet stack.
Finally, we will discuss the feasibility of a kernel-space implementation of the "scalability layer".
Acknowledgements
I would like to thank to Martin Lucina and Pieter Hintjens who helped with the text, proofreading, examples and editing the document.
Introduction
The Internet Protocol dominates 100% of the IT industry but this was not always inevitable. In 1995 Microsoft launched Windows 95, a desktop product that had no Internet but instead used a proprietary network, the Microsoft Network (MSN). The MSN and AOL walled gardens failed, as did every other attempt to capture the growing Internet in private networks. Following the Internet's move onto every computer, society, economics, and even politics have gone digitial to an extent that was impossible to foresee even ten years ago.
Why did the Internet dominate? It is owned by no-one, not patented, not subsidised, and represents no dominant market interest. Unlike the GSM stack and Windows, the Internet looks like an orphaned technology that should have been easily quashed by private interests.
The answer is exactly in this lack of centralized ownership. Aside from not being patented — which would allow a single party to decide on its fate — and being clearly documented — which prevents "ownership by obscurity" — the Internet was deliberately designed as a fully distributed architecture with no single point of control. The decision was allegedly driven by the US Department of Defense desire to have an infrastructure that would withstand a nuclear attack. Centralised architecture simply doesn't work well when the centre is nuked.
This architectural decision had a lot of more or less unexpected consequences. It meant that the Internet was basically out of control. It could spread in all directions and there was no one to stop it. It proved to be infinitely scalable as there was no central piece that would become a bottleneck. It became free (as in beer) as there was no monopoly to own it and a flourishing free market drove the price down to zero. It turned out to be free (as in speech) as there was no single entity capable of intercepting or censoring the whole of it. It could be properly secured as its fundamental 'end-to-end principle' didn't force endpoints to trust the intermediaries.
Thanks to the above properties lot of private, governmental and business data was migrated from physical world to the Internet. In short, the lack of centralised ownership on both political and technical levels resulted in the ubiquitous Internet of today.
Beginning in the late 1990s, the World Wide Web started to define the digital economy. Built on a protocol for interconnecting documents, the Web rapidly spread as the cheapest and simplest way of publishing and sharing knowledge. Again, with no patents or taxes on the technology, with a clear set of interoperable standards and a fully distributed architecture the inevitable happened. Competition was efficient and prices of web-related products fell towards zero. The Web was out of control, it had a potential for infinite growth, it was highly scalable and didn't mess with users' freedom.
Once again, lack of centralised ownership resulted in 100% adoption rate and fertile economy flourishing on top of the technology.
As more and more of our economy goes digital, however, we are seeing the emergence of a new type of business that mimics the mobile phone industry more than it does the Internet or the World Wide Web. That is the emergence of centralized service providers: Google, Facebook, eBay, Youtube, even Twitter.
Naturally there is a concern about the emergent centralised services not having the desirable properties of distributed architectures such as those listed above. Worried voices come from different camps. While scalability is a concern for those running said services, compromising the privacy of consumers is voiced by pro-freedom activists. Centralised infrastructure not being free (as in beer) and being under control of a single party prevents businesses to move in. Investors may be worried that service centralisation creates deeply entrenched monopolies with a little space to compete with. Policy makers, on the other hand, should be worried that the miraculous turning of platforms into fertile ecosystems supporting whole economies — the way TCP/IP and world-wide web did it — seems to be effectively prevented by a tight grip on the platform exercised by a single entity.
We believe that aside of TCP/IP (interlinked network hosts) and world-wide web (interlinked documents) we need a simple general-purpose decentralised infrastructure to interlink services. The following chapters will present such an architecture and the steps required to make it a natural part of the Internet.
Building a simple application
The first thing to understand is that not all the services are the same. Still, the scope of options — at least from the messaging point of view — is rather limited. Some 90% of cases use either the "data distribution" paradigm (also known as "publish/subscribe" pattern) or the "client/server" paradigm (also known as "request/reply" pattern). We are going to present how these two most common cases can be implemented using ØMQ. Afterwards we'll show how the applications we'll write can be distributed in a decentralised manner and stretched to basically any scale.
Client/Server
Following tradition, we start with a Hello World example. We'll make a client and a server. The client sends "Hello" to the server, which replies with "World".
At this point it should be said that ØMQ uses a POSIX-like socket API rather than a proprietary domain-specific interface. Thus, we are going to work with sockets. Also, for the sake of brevity, we shall present most of our examples in Python; ØMQ as it stands today provides a C API with many language bindings developed by the community.
In this case the server opens a socket, reads requests on it, and replies with "World" to each request:
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5566")
while True:
message = socket.recv()
print message
socket.send("World")
The client opens a socket, sends "Hello" and waits for a reply from the server:
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5566")
socket.send("Hello")
message = socket.recv()
print repr(message)
Now, what does this code actually do? It looks too simple to do any substantial work.
Actually, behind the scenes, it does a lot. It frames messages. It sends and receives them in an asynchronous non-blocking manner. It checks for connection failures and re-establishes connections as needed. It queues the messages if the peer is unavailable at the moment. It ensures that individual peers are assigned their fair share of server resources so that a single client can't hijack the server. It routes the replies to the original requester.
The crucial point, however, is that the server socket is capable of handling an arbitrary number of clients. You could literally throw thousands of clients at this server, all at once, and it would continue to work happily and quickly.
As can be seen in the examples, there's no need to handle individual connections by hand. Actually, individual connections are invisible to the user. That is deliberate and it has profound implications on the scalability and distributability of the system. In short, a ØMQ socket is an abstraction that handles a set of underlying connections intelligently and transparently to the user.
Each socket has a type and the type defines how the socket behaves with respect to the connections it handles. In this case, the server uses a REP socket. REP stands for "replier" and what it says is: "Peers will send us requests, we are going to process them and send the replies back to the original requesters."
There are only a few socket types and all are designed in such a way as to make the resulting communication patterns scalable. In addition to the REQ and REP sockets presented in this example we are going to introduce PUB and SUB sockets that allow you to distribute data to clients.
Publish/Subscribe
While REQ and REP sockets work like a pretty standard client/server application, an SQL server for example — a single query produces a single result — PUB and SUB sockets are more like broadcast radio. A client subscribes to particular data (tunes into a radio station) and the data is asynchronously pushed to it until it unsubscribes (tunes out).
The server uses a PUB (publisher) socket to publish an alternating sequence of Hellos and Worlds to the clients:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5566")
while True:
socket.send("HELLO?")
socket.send("WORLD?")
The client uses a SUB (subscriber) socket. It subscribes to all messages from the server by using an empty string as the SUBSCRIBE socket option:
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5566")
socket.setsockopt(zmq.SUBSCRIBE, "")
while True:
message = socket.recv()
print repr(message)
You can run many subscribers at once. They tune into the server by subscribing to message prefixes. Messages are replicated to all subscribers, depending on what subscriptions they have set. Literally, every subscriber gets every matching message. This is a multicast model of data distribution.
Now we can start two or more subscribers, perhaps changing the subscription in one of them to "H" or "W". SUB sockets do a prefix match, so an empty subscription ("") pulls all messages and if you don't set any filter, you get nothing at all.
Scaling up
Publish/Subscribe
Scaling up the publish/subscribe messaging pattern is fairly easy to grasp so we'll start with it.
Imagine a stock trading company. A real-time feed of stock prices is distributed from the company's main server to the client workstations of individual traders. So long as the communication all runs over a LAN everything works fine. However, traders at geographically distant branch offices are causing troubles. If there are fifty traders at a particular branch office, stock quotes are transferred via the WAN as fifty separate copies.
ØMQ's model of scaling is based on so called devices, intermediary nodes that sit on the network and route the messages.
There can be different types of devices. The most simple ones are small pieces of software, executables delivered with ØMQ. Users are free to write their own devices (it takes just few lines of code) optionally adding specific business logic. An Ethernet switch — the one you already own and use — is a device for IP multicast-based transports (yes, ØMQ supports delivery over IP multicast too). In the future, hardware manufacturers will be free to implement all kinds of devices in silicon.
So, to solve the stock price problem, you can take a device and place it on the edge of the companies branch office's network. The device would connect to the main server located at company headquarters and redistribute the prices to the local traders.
That was the corporate scenario. Now let's consider a different use case. Say a live video broadcast of the Olympic games held in Europe is subscribed to by thousands of viewers in New Zealand. Exactly the same data are transferred in thousands of copies over the inter-continental link.
A local ISP in New Zealand may decide to run a device at the edge of New Zealand's network and redirect all clients trying to access the European server to the device. Bandwidth usage on the intercontinental link would suddenly drop by several orders of magnitude.
While the stock price and the Olympics broadcast examples may seem superficially similar there is a crucial difference between them. In the former, the transition from direct feeds to a proxy device was done within a single company. There is nothing special about it. It is the way how things are done today.
In the latter case, however, the change was done independently of both the content provider and the users. It's very different from anything you can do today. To put it simply, network topology of the service is out of control of the content provider. No single entity can control the service in its entirety. Instead, topology is managed in a collaborative manner by all participants: the content provider, the ISP and the content subscribers.
Before moving on, think about what it means for a second. Content is distributed efficiently, but opaquely to the publisher. Content provider is not even able to tell how many subscribers there are. In other words, the RIAA is not going to like this. Distributed, collaborative topology has a profound (whether disruptive or beneficial) impact on the economics of the service.
Request/Reply
How does the above idea of interjecting devices between the endpoints apply to the request/reply pattern?
The simplest possible model of request/reply is the client/server architecture. The clients speak directly to the server. There are no intermediate nodes between them.
What would happen if we inserted an intermediate node? Think of this:
The image seems familiar. Yes, it's the buzzword from a few years back, the Enterprise Service Bus! The node in the middle (called simply "bus") provides three important features:
- There can be more than a single server, meaning that the application can scale up to multiple boxes.
- The clients don't have to be aware of servers' location. They speak to the bus which in turn dispatches the messages to the servers.
- The servers can come and go, depending on need, without disrupting the clients.
Now, this is exactly how you would like your distributed service to behave.
However, an Enterprise Service Bus is almost by definition centralised. As its name suggests is used in the enterprise. The bus forms the backbone of the enterprise's service infrastructure. It is under full control of the enterprise, with dedicated administrators making sure it is functional and highly available. In most cases it is even implemented as a single monolithic application called a "message broker".
Now recall the Olymics transmission example in the previous section. The intermediary node was inserted between the server and the clients by a third party independently of either the content provider or the content subscriber. What would happen if the same thing was done in request/reply case? Would the service bus break free from the enterprise prison? Would the Enterprise Service Bus turn into the Global Service Bus? And what it would be good for in the first place?
To answer these question we'll have a look at an example of a simple service scaling up from modest client/server beginnings to a global ecosystem.
Imagine the programmer Joe who writes his client/server application using ØMQ. Joe does so because it reduces the work he has to spend on the networking subsystem; with ØMQ all he needs to do is to write a few trivial lines of code. Joe has no intention of creating a distributed service, in fact, maybe he doesn't even know what a distributed service is. Nonetheless, by using ØMQ he has unknowingly subscribed to the idea. That is both good and bad news for him.
The good news is it allows him to scale the service. With no additional trouble, Joe can run the service in an ESB-like setup distributing the workload among multiple servers; build a datacenter of his own or move to a cloud provider. In either case he won't have to modify the code.
The bad news is that the distributed nature of the system makes it possible for Joe's competitors to enter the service ecosystem. They can launch competing implementations of the service. They can even extend it in unexpected ways.
So, let's say there's a rival service run by an evil corporation that took advantage of Joe's idea and decided to provide an alternative service. The new service has much higher reliability when compared to Joe's which tends to go offline once in a while. On the other hand, it is much more expensive than Joe's service.
With a traditional centralised architecture, the client would have to decide for either Joe or for his competitor. With a decentralised architecture he can use both services at the same time. By setting priorities he can specify to use Joe's cheap service by default. When it goes offline it will automatically switch to the expensive corporate alternative:
Let's now imagine that the ecosystem flourishes and as time passes there is a large scale of service providers available. It becomes increasingly troublesome for the users to follow all the service providers and the discounts they offer and to choose the best strategy for accessing the service.
At this point someone may come with an innovative business model: To follow prices offered by all the service providers and route users' requests to the one that is cheapest at the moment. The goal is to become an exchange for services. You can think of all kinds of fancy functionality that can be provided by the exchange. For example, it can allow you to post request such as "do this work once the price drops below $10!"
The above sounds rather familiar. Yes, it's the "service on demand" model exposing "location independence" principle. The work is dispatched to wherever the conditions are best for executing it.
Isn't that what the cloud computing is supposed to do?
Yes, but with one crucial distinction: The cloud in our case is highly distributed and out of control of a single cloud provider. Individual service providers are completely independent each of another. Together they create a free market for the service and thus keep the price at the balanced level. Same applies to the "service exchange". Once it starts taking unfair advantage of the fact that it provides a single consolidated entry point to the service, there's nothing to preventing competitive service exchanges to emerge.
The diagram shows multiple service providers and two service exchanges. Note that users are free to use both exchanges or even connect directly to the service providers. The topology is highly flexible.
The story doesn't end here. The infrastructure will evolve further, however, we'll make just one more observation at this point: Note how the infrastructure organically grew from the edges to the middle. At the beginning there were just clients and a service located at the edges of the network. Later on intermediary nodes, ESBs and service exchanges were added. Consider how easy it makes adoption of the technology and how the centralised system of "set up the infrastructure in the middle first, then hope that clients will arrive" does exactly the opposite.
Extending the Internet stack
Given the technical and economic importance of distributed services combined with its impact on user freedom, it seems extremely important that the related functionality gets standardised and becomes ubiquitously available.
However, ØMQ is a product, an implementation. The moment we start talking about standardised stacks we have to make distinction between the layer as such and its implementation.
To make the distinction clear, we introduce the concept of the "scalability layer". In the context of Internet protocol suite we are going to refer to it as SP ("Scalability Protocol") to make the name fit with existing layers such as IP ("Internet Protocol") and TCP ("Transmission Control Protocol").
In OSI model terms the scalability layer lives at layer five, the session layer. The diagram below shows the Internet stack with the SP layer fitted in. Keep in mind that the Internet stack is rather flexible and the diagram presented is just an example. The Ethernet layer can be replaced by InfiniBand, TCP can be replaced by SCTP or even reliable multicast such as PGM. Presentation layer is not necessarily embodied by XDR. You can use Google protocol buffers, JSON, ASN.1/DER or even your own custom serialisation algorithm. You can compress or encrypt the data on this layer and so on.
Treating the above as a goal, what exactly has to be done to make it happen?
First of all, SP as a layer in the stack should be clearly defined. It means that the existing ØMQ implementation should be taken apart and analysed. We should decide what fits into the scope of "scalability layer" and what is just an implementation detail specific to ØMQ.
This endeavour consists of three basic tasks:
- providing a fully formalised description of scalability semantics
- defining the standardised API
- defining the standardised wire-level protocol
Regarding formalising scalability semantics, what ØMQ implements so far is sufficient for a large percentage of use cases. There are some corner cases to cover but that's not a big deal. More important questions to answer are:
- Are there more scalable messaging patterns out there, or are those defined by ØMQ (request/reply, publish/subscribe, etc.) sufficient?
- Are there any specific features of the exisiting patterns that would make scaling to global proportions problematic? For example: Isn't the option of having multiple publishers in a data distribution tree inherently unscalable?
- How are cycles in the topology to be addressed?
- More generally: What are the global properties of individual patterns? Can we give formal guarantees for problems like congestion avoidance or fair distribution of workload?
These are rather difficult problems. Some of them we — the engineers — will be able to solve ourselves. For others we will need help from the academic community. Some formal research on these topics has already been done, but it is in no way sufficient. Thus, we would like to encourage academic research of large-scale distributed algorithms in the future.
Regarding the standardised API, most of the work has already been done. As already mentioned, ØMQ uses a POSIX-like socket API. It's not yet 100% conformant with POSIX but it's gradually moving that way, being hindered mostly by backward compatibility concerns.
The following example shows where we are today:
#include <zmq.h>
int main (int argc, char *argv[])
{
void *ctx, *socket;
zmq_msg_t *msg;
ctx = zmq_init (1);
socket = zmq_socket(ctx, ZMQ_REQ);
zmq_connect (socket, "tcp://127.0.0.1:5577");
zmq_msg_init (msg, 6);
strcpy (zmq_msg_data (msg), "Hello");
zmq_send (socket, msg, 0);
zmq_msg_close (msg);
zmq_term (ctx);
return 0;
}
We believe that we are at the point where we are able to define an API fully integrated with the existing POSIX socket API for the scalability layer. Even better, it looks like the changes required to the POSIX socket API would be trivial; adding a few numeric constants for the protocol family and socket types.
An example of such a hypothetical POSIX API for SP sockets:
#include <sys/socket.h>
#include <netinet/sp.h>
int main (int argc, char *argv[])
{
int socket;
struct sockaddr_in addr;
socket = socket (PF_SP, SOCK_REQ, IPPROTO_TCP);
addr.sin_family = AF_INET;
addr.sin_port = 5577;
addr.sin_addr = INADDR_ANY;
connect (socket, &addr, sizeof addr);
send (socket, "Hello", 6, 0);
close (socket);
return 0;
}
Finally, regarding the wire-level protocol, the exact format is dependent on the exact semantics of SP. We believe that the existing simplistic wire format used by ØMQ is sufficient for the large majority of use cases, however, we expect it to be somewhat extended as the work on formalisation of scalability semantics proceeds. It is our intention to move the development of the wire protocol under the auspices of IETF.
One of the crucial parts of standardisation of a technology is to provide multiple interoperable implementations. And for a technology that intends to become a fully integrated part of the Internet stack the obvious choice is to be implemented directly in an OS kernel. We are going to discuss the technical aspects of such implementation in the following section.
Toward a kernel-space implementation of SP
Traditionally, message-oriented middleware systems were implemented in user space. When moving into the kernel we have to understand what were the reasons for that decision and make sure that those reasons no longer apply to our SP proposal. Also, while "becoming a part of Internet stack" sounds like a compelling reason to move the functionality into the kernel, it has little technical substance and we have to present better grounded technical rationale to justify the move.
As for traditional message-oriented middleware systems, the main arguments against implementing them in kernel space are:
- Existing implementations tend to be resource hogs to be controlled by the OS rather than being part of it.
- Existing implementations and wire-level protocols duplicate a lot of the functionality already present in the kernel and Internet stack.
- Implementations had overgrown APIs with rather vague semantics.
- Message-oriented middleware was supposed to do "persistent messaging", i.e. to eat a substantial amount of disk space.
- Until recently most implementations of message-oriented middleware have been proprietary products.
SP, as embodied by ØMQ today is free of those problems:
- It has no special need for resources. Some memory for transmit and receive buffers is needed but the same can be said of TCP and other kernel-space protocol implementations.
- We have been careful to remove all the duplicate functionality and leave only the very essence of messaging — scalability algorithms and messaging patterns.
- There is no API to speak of. As demonstrated in the previous section, all SP functionality should be accessible via existing POSIX socket APIs.
- SP is not supposed to do "persistent messaging". It is fully transient and no data are stored on the disk.
- Finally, the SP concepts and ØMQ implementation are Free Software.
The above means that there are no technical obstacles preventing implementation of SP in kernel space. But why would we want to implement it in kernel space in the first place? There are several compelling reasons:
- There is no way, apart from ugly linking hacks, to transparently implement the POSIX socket API in user space.
- Consequently ØMQ socket functions cannot be named identically to their POSIX counterparts, rather they must use a separate namespace, e.g. zmq_socket() vs. socket().
- ØMQ socket descriptors are not POSIX file descriptors. They can't be polled on using standard multiplexing functions such as poll() or select() and thus it is hard to plug a ØMQ socket into an existing event loop.
- A TCP socket continues to transmit written but unsent data even after it was closed and the sending application has exited. Equivalent behaviour for SP sockets cannot be implemented in user space.
- A kernel-space implementation will be accessible even on platforms that have strict policy with respect to installing additional software (mostly mobile and embedded industry).
- It will be accessible even in corporate environments. In such environments, the request for installation of software often has to be approved by different instances including a legal department; the whole process can take several months.
- Some programming language communities (e.g. Java) prefer not to use external libraries written in a different language. However, they are happy to use functionality provided by the OS kernel.
- Given that the SP API adds no extra functions to the POSIX socket APIs, just a couple of constants, it should be trivially accessible from any programming language with support for existing POSIX socket APIs.
- Once there are bridges from SP to existing message-oriented middleware systems (AMQP, JMS, STOMP, etc.) any such system would be transparently accessible from any Internet host, using any language, with no special software to install, using just the kernel implementation of SP.
EDIT: Proof of concept implementation of the SP layer in Linux kernel can be found here.
Conclusion
We have presented an architecture for distributed services and shown how it scales from a simple client/server topology to global service networks. We have argued that for distributed services to gain wide acceptance it is essential to make such architecture a standard part of the Internet stack. We have explained where this "scalability layer" belongs within the OSI reference model and Internet stack and what steps are needed to get it fully integrated into the Internet infrastructure. We have pointed out the importance of having a kernel implementation of the "scalability layer" on the road to universal acceptance and discussed the technical reasons why we believe that a kernel implementation of SP is feasible.
In conclusion, we believe that SP is a step forward for freedom from the emerging centralised service model; by using SP as an interconnect for existing pieces of software, applications are capable of switching to decentralised services, or even using both centralised and decentralised services in parallel. That would in turn enable a gradual move away from the centralised service model of today to the happy decentralised model of tomorrow.
We would like to invite discussion of SP concepts in the wider FOSS operating system developer communities and based on that progress toward actual implementation work.