It’s a “catch up on blogging” weekend. Some months ago, while learning more about ZeroMq, I wrote and pushed to GitHub and NuGet a little library called ServiceMq, a peer-to-peer “store and forward” message queue library inspired by what I learned about ZeroMq and incorporating the ServiceWire library I had previously created.
ServiceMq is an experimental library at this point. I have not spent any time thoroughly testing or improving it since I created it. This is not because it’s not a cool project but only because my time has been limited by demands of the day job and family. One must have priorities. That’s what my wife says anyway.
So now with a brief moment of free time, I’m happy to share with you this little bit of work. Let me explain how it works and then I’ll share some test code here to illustrate. If you are interested in it, I urge you to get the NuGet package or clone the code and try it out and let me know if it has been useful to you.
It is also very important for me to mention that I pulled in and renamed the namespaces for neatness within the library the entire ServiceStack.Text v3 code base as the serialization library used by ServiceMq to enable fast and easy serialization using JSON across the wire without burdening the user of the library with having to make any special accommodations with their message DTO classes. You need to know that after v3, the ServiceStack.Text library’s license changed, so if you plan to use it on your own, be aware of the change. The version I’ve used is 100% compatible with the Apache 2.0 license and derivative notice in the code on GitHub.
While the test code below are the only tests I’ve written for the project. They cover only the primary use cases. The tests have both sender and receiver queues in a single process. In practice you would use the library generally in two processes to enable message passing between them.
The store and forward persistence of messages is important for this library as performance was less important than guaranteed sending and receiving. Scale and memory consumption were not addressed in this initial release.
Here’s the order of events on the sending end:
- Send method first writes the message to a file.
- Send method then tries to send to the intended recipient.
- If the send fails, the message is placed on a “failed-retry” queue.
- If the sending process fails or is shut down, all persisted messages are read back into memory when the process restarts and creates the message queue again.
- When the message is successfully sent, the outbound message file is deleted after the message content is appended to a rolling outbound log so that an audit of messages sent is possible.
Now here’s the order of events on the receiving end:
- The message queue receives a message and writes it to a file.
- The queue’s Receive method is called and pulls a message when it becomes available off the queue and calls Acknowledge method (see more on Acknowledge below).
- Or the queue’s Accept method is called and pulls a message when it becomes available off the queue but does NOT call the Acknowledge method. This is used by code that may fail to process the message and so the message is not actually removed from the inbound queue.
- The Acknowledge method is called, either automatically in the Receive method, or manually after the Accept method is used. The Acknowledge method logs by appending the message to the inbound message log and deletes the individual message file.
- If the receive process fails before the Acknowledge method is called to delete the message file and log it, the incoming queue will read it into memory prior to new messages arriving in order go guarantee order of delivery of the messages.
Now here’s the test code that shows how each end works:
[TestMethod]
public void SimpleTest()
{
var q1Address = new Address("q1pipe");
var q2Address = new Address("q2pipe");
using (var q2 = new MessageQueue("q2", q2Address, @"c:\temp\q2"))
using (var q1 = new MessageQueue("q1", q1Address, @"c:\temp\q1"))
{
q1.Send(q2Address, "hello world");
var msg = q2.Receive();
Assert.IsNotNull(msg);
Assert.AreEqual(msg.MessageString, "hello world");
}
}
[TestMethod]
public void SimpleTcpTest()
{
var q1Address = new Address(Dns.GetHostName(), 8967);
var q2Address = new Address(Dns.GetHostName(), 8968);
using (var q2 = new MessageQueue("q2", q2Address, @"c:\temp\q2"))
using (var q1 = new MessageQueue("q1", q1Address, @"c:\temp\q1"))
{
q1.Send(q2Address, "hello world");
var msg = q2.Receive();
Assert.IsNotNull(msg);
Assert.AreEqual(msg.MessageString, "hello world");
}
}
[TestMethod]
public void SimpleObjectTest()
{
var q1Address = new Address("q6pipe");
var q2Address = new Address("q8pipe");
using (var q2 = new MessageQueue("q8", q2Address, @"c:\temp\q8"))
using (var q1 = new MessageQueue("q6", q1Address, @"c:\temp\q6"))
{
int[] data = new int[] { 4, 8, 9, 24 };
q1.Send(q2Address, data);
Message msg = q2.Receive();
Assert.IsNotNull(msg);
var data2 = msg.To<int[]>();
Assert.AreEqual(data[1], data2[1]);
}
}
[TestMethod]
public void SimpleBinaryTest()
{
var q1Address = new Address("q3pipe");
var q2Address = new Address("q4pipe");
using (var q2 = new MessageQueue("q4", q2Address, @"c:\temp\q4"))
using (var q1 = new MessageQueue("q3", q1Address, @"c:\temp\q3"))
{
byte[] data = new byte[] { 4, 8, 9, 24 };
q1.SendBytes(q2Address, data, "mybytestest");
Message msg = null;
while (true)
{
msg = q2.Receive();
if (msg.MessageBytes != null) break;
}
Assert.IsNotNull(msg);
Assert.AreEqual(msg.MessageBytes.Length, 4);
Assert.AreEqual(msg.MessageBytes[2], (byte)9);
Assert.AreEqual(msg.MessageTypeName, "mybytestest");
}
}
I’m certain the code base needs work and needs to be tested under load and limited memory circumstances. Perhaps even a caching strategy needs to be implemented for scenarios where message volume is very high. I look forward to your feedback.