Reactive Systems: Let’s write a TFTP server with: Akka
Welcome back! The past two posts in our series on building reactive systems have dealt with the topics of performance and fault-tolerance. In this post I’m going to talk about resilience, with yet another one of my trademark contrived examples that I invented in my head and put to code.
Instant payments have an inherent high-availability requirement. A user can send a payment instruction at any time of the day, and we have to be able to honour it within the scheme’s SLAs. So how does IPF ensure maximum availability while maintaining transparency and ease of use? Enter Akka clustering.
Akka clustering: A cluster of joy
IPF is clustered by default. When we start the IPF application for testing, it’s starting in a cluster of one. It’s very simple to add another cluster by specifying a seed node. This is the address of a remote actor system, which the fresh new node can contact to grab the cluster state. This includes stuff like who’s currently the boss of the cluster, who else is in the cluster, where they are, what their addresses are, and so on. If you’re interested in more details, check out the Akka docs for clustering.
I can already hear the cries for help. “But Patrick! I want to start my actor system in a cluster, but:
- I have a single actor which maintains state, and can’t have two of it running at once in a cluster!”
- My actor system binds to a socket, and so if I start up a new node on the same machine, I’m going to get a
- I have centralised router logic which receives all requests and fans them out accordingly!”
Never fear! This has already been considered, and what you need in this case is a cluster singleton. Creating an actor as a cluster singleton tells Akka that only one instance of this actor can ever be active in a cluster. If the node that’s running the cluster singleton dies, there’s a MongoDB-esque election process which happens, and the oldest active node in the cluster takes over the duty of running the cluster singleton.
And what about state?
Say that your really special application not only has a need for a cluster singleton, but also needs to maintain some sort of state for the entirety of its uptime. A good example would be HTTP sessions for active users for your website. Just because a node in my cluster has died, I don’t want everybody’s shopping cart to be cleared, them to be logged out, and my website to die in a fire! That’s bad for business!
These are propagated with eventual consistency. As a result anything stored in there must be a conflict-free replicated data type, or CRDT.
Right, that’s enough text; let’s have…some more text, but in the form of an example! Let’s put the idea of a cluster, cluster singleton and a CRDT store together in some sort of application.
I was sitting at someone else’s desk this past week and noticed their VoIP phone flashed once every five seconds or so. Aside from being, like, super annoying, I noticed that it was actually stuck in a reboot loop, and between each reboot the display flashed something like
This gave me an idea: I can demonstrate all of this stuff in a TFTP server! It’s a subset of FTP which is easy to implement, and allows me to demonstrate:
- Cluster: for high availability
- Cluster singleton: I want to have only one node that listens to the TFTP UDP port at a time
- Distributed data: If one node dies halfway through a transfer, I want to be able to seamlessly switch between nodes to serve the same in-flight request
Here’s an “architecture diagram” of a single node:
Fig. 1: Pretending I can draw diagrams
Before we start, a brief intro to TFTP. There are five message types:
RRQ: A read request. Sent by the client to the host when initiating a transfer
DATA: Sent by the host, containing a block number and 512 bytes of data
ACK: Sent by the client back to the host, acknowledging a specific block number
WRQ: A write request. Not implemented in this example
ERROR: When things go wrong
Every time we receive a Read Request (RRQ), we store the client socket and the associated RRQ in the distributed data store. Then, we create a SegmentSender temporary (ephemeral) actor to send 512-byte DATA segments until the end of the file. After sending the DATA segment, the SegmentSender stops itself.
TFTP is a lockstep protocol, so each time I get an ACK for my DATA, that triggers a send of the next DATA block, and so on until the file is transmitted in its entirety.
Since the RRQ and the associated requester socket is stored in the distributed data store, if the host that’s received the request dies and doesn’t send any DATA after an ACK, the new cluster singleton node will receive a retry ACK from the client and will be able to continue sending the remainder of the file! And the client will be none the wiser as to what’s happened (except for maybe a mini pause in service).
Let’s run a test
OK, so here’s what our test is going to look like:
- Start two
- Verify that we’re listening on the chosen UDP port
- Use tftp(1) to get a random file
- Switch the first node off halfway through
- Watch the file transfer complete anyway
The first two steps are easy and we can indeed see the following entries in the log:
[INFO] [03/07/2018 19:45:16.229] [main] [akka.cluster.Cluster(akka://akka-tftp-server)] Cluster Node [akka.tcp://akka-tftp-server@localhost:2551] - Started up successfully [DEBUG] [03/07/2018 19:45:21.338] [akka-tftp-server-akka.actor.default-dispatcher-3] [akka.tcp://akka-tftp-server@localhost:2551/system/IO-UDP-FF/selectors/$a/0] Successfully bound to [/0:0:0:0:0:0:0:0:13337]
This indicates that a cluster node has started, it’s started a cluster singleton and is listening on UDP port 13337.
Now let’s go to our shell and create a 11 MB file of random data:
$ head -c 11M </dev/urandom > ~/myfile
Now let’s grab it!
$ tftp localhost 13337
tftp> get /home/patrickaltaie/myfile
We can see the consumer’s request being stored in the CRDT store, and the first few bits of the transfer being logged:
[INFO] [03/07/2018 19:53:44.661] [akka-tftp-server-akka.actor.default-dispatcher-3] [akka://akka-tftp-server/user/readRequestActor] Storing requester /127.0.0.1:35487 with RRQ ReadOrWriteRequest(tftpOpcode=READ_REQUEST, filename=/home/patrickaltaie/myfile, mode=netascii) in CRDT store [DEBUG] [03/07/2018 19:53:44.677] [akka-tftp-server-akka.actor.default-dispatcher-21] [akka://akka-tftp-server/user/readRequestActor/$a] Sending block #1 to /127.0.0.1:35487 (size: 512B) [DEBUG] [03/07/2018 19:53:44.679] [akka-tftp-server-akka.actor.default-dispatcher-17] [akka://akka-tftp-server/user/ackActor] Received an ACK for block 1 for consumer /127.0.0.1:35487 [DEBUG] [03/07/2018 19:53:44.680] [akka-tftp-server-akka.actor.default-dispatcher-20] [akka.tcp://akka-tftp-server@localhost:2551/system/ddataReplicator] Received Update for key [key] [DEBUG] [03/07/2018 19:53:44.680] [akka-tftp-server-akka.actor.default-dispatcher-17] [akka://akka-tftp-server/user/readRequestActor] Update success: UpdateSuccess(key,None) [DEBUG] [03/07/2018 19:53:44.704] [akka-tftp-server-akka.actor.default-dispatcher-21] [akka://akka-tftp-server/user/ackActor/$a] Sending block #2 to /127.0.0.1:35487 (size: 512B) [DEBUG] [03/07/2018 19:53:44.704] [akka-tftp-server-akka.actor.default-dispatcher-21] [akka://akka-tftp-server/user/ackActor] Received an ACK for block 2 for consumer /127.0.0.1:35487 (and so on)
We can see here that we’ve received a new connection and we’ve stored our new client’s socket identifier and their RRQ (line 1). We then sent them one block of data which they’ve ACKed, and in the mean time (lines 4, 5) we’ve been successful in saving the socket and RRQ pair to the CRDT store. We then send block 2, get an ACK for that, and so on.
Now, halfway through the transfer, shut down node 1 of the cluster and watch the node 1 logs:
[DEBUG] [03/07/2018 19:53:46.990] [akka-tftp-server-akka.actor.default-dispatcher-21] [akka://akka-tftp-server/user/ackActor/$Yh] Sending block #500 to /127.0.0.1:35487 (size: 512B) [DEBUG] [03/07/2018 19:53:46.990] [akka-tftp-server-akka.actor.default-dispatcher-36] [akka://akka-tftp-server/user/ackActor] Received an ACK for block 500 for consumer /127.0.0.1:35487 [INFO] [03/07/2018 19:50:55.958] [akka-tftp-server-akka.actor.default-dispatcher-5] [akka.tcp://akka-tftp-server@localhost:2551/user/$a] ClusterSingletonManager state change [WasOldest -> HandingOver] [INFO] [03/07/2018 19:50:55.959] [akka-tftp-server-akka.actor.default-dispatcher-4] [akka.tcp://akka-tftp-server@localhost:2551/user/$a] Singleton terminated, hand-over done [akka.tcp://akka-tftp-server@localhost:2551 -> Some(akka.tcp://akka-tftp-server@localhost:2552)]
The cluster singleton terminates while sending block 500, and the JVM starts to shut down and hands over the reins (line 4) to node 2. If we look at node 2:
[DEBUG] [03/07/2018 19:53:47.032] [akka-tftp-server-akka.actor.default-dispatcher-19] [akka.tcp://akka-tftp-server@localhost:2552/system/IO-UDP-FF/selectors/$a/0] Successfully bound to [/0:0:0:0:0:0:0:0:13337]
We can see that node 2 has started the server socket on UDP port 13337, and then a bit later still on node 2 we’ve grabbed the consumer’s client socket from the CRDT store and continued with it:
[DEBUG] [03/07/2018 19:53:52.026] [akka-tftp-server-akka.actor.default-dispatcher-15] [akka://akka-tftp-server/user/ackActor] Received an ACK for block 500 for consumer /127.0.0.1:35487 [DEBUG] [03/07/2018 19:53:52.027] [akka-tftp-server-akka.actor.default-dispatcher-15] [akka.tcp://akka-tftp-server@localhost:2552/system/ddataReplicator] Received Get for key [key] [DEBUG] [03/07/2018 19:53:52.032] [akka-tftp-server-akka.actor.default-dispatcher-22] [akka://akka-tftp-server/user/ackActor/$a] Sending block #501 to /127.0.0.1:35487 (size: 512B) [DEBUG] [03/07/2018 19:53:52.034] [akka-tftp-server-akka.actor.default-dispatcher-22] [akka://akka-tftp-server/user/ackActor] Received an ACK for block 501 for consumer /127.0.0.1:35487 (and so on)
And finally back on our client:
$ tftp localhost 13337 tftp> get /home/patrickaltaie/myfile Received 11534336 bytes in 11.3 seconds
Node 2 has carried on processing the client’s request, even though the initial node that took the request has died! How resilient!
Limitations of the
Since this is just a demo of having data live past the lifetime of a single actor system JVM, the TFTP application has a few limitations:
- It’s not production-ready! There’s no sandbox, so any TFTP client that connects is able to grab any file on the host system if they know the path (including well-known paths…like, say,
- It doesn’t implement the
WRQin TFTP terminology. It returns an error to the client stating that this operation isn’t supported
- There’s an improvement that we can add in terms of what happens when the block number reaches 65535. All good TFTP server implementations — which this is not — are supposed to roll over back to block 1 when block 65535 is reached
Bored? Want to learn some Akka? Send me a pull request on GitHub to fix some of these things! See below for details.
Akka is built with high availability and resilience in mind. There are loads of built-in tools like the ones I’ve mentioned today which make our lives easier here in the IPF team. We as developers can then focus more on writing more actual functionality and less boilerplate.
If you’re interested, my TFTP application is available on GitHub: http://github.com/paltaie/akka-tftp-server. The entry point is here. To run two different nodes on the same machine, start the second node with a system property to change the Akka remoting TCP Port:
-Dakka.remote.netty.tcp.port=2552. Otherwise you’re ready to go!
If for some reason you want to check out how the TFTP protocol works, here’s a link to the RFC. Happy hacking!
This blog is part of a series, read the previous post here.