tsJensen

A quest for software excellence...

ServiceWire 1.5.0 Released

ServiceWire is a very fast and light weight service host and dynamic client library that simplifies the development and use of high performance remote procedure call (RPC) communication between .NET processes over Named Pipes or TCP/IP.

The DuoVia.Net library has progressed significantly. But everyone I work with who uses it balks at the name. So I’ve renamed it. And I like the name very much. I hope you do too. It is ServiceWire. This name more aptly describes intuitively what the library does. Hopefully this will help with adoption and participation.

I’ve laid out the documentation wiki on that site and will spend the next few days or weeks getting it completely fleshed out. The code is in a new repository and there is a new NuGet package which, with the exception of namespaces, is at perfect parity with DuoVia.Net version 1.5.0.

I’m very interested in getting your feedback on ServiceWire, the name and the library.

BufferedStream Improves .NET Sockets Performance

.NET’s BufferedStream saved the day, instantly reducing 200-400ms operations across the wire to 1ms. Why? Simple answer, the Socket class returns a NetworkStream. Wire up that stream to a StreamWriter and a StreamReader, and you’re good to go, right? Wrong. Turns out the StreamWriter and StreamReader have a default 16 byte read/write buffer. And NetworkStream has none.

So if you have a TCP socket wired up to a NetworkStream, you’re trying to send or receive just 16 bytes at a time, utterly killing performance over TCP. Now magically wrap that NetworkStream into a BufferedStream and pass that BufferedStream into your StreamReader and StreamWriter and you get instant performance gains that will knock your sockets off.

Backstory: For months I’ve been writing and improving my DuoVia.Net fast services library. And recently the my day job’s team began using it some very clever ways (sorry, NDA and all), but we ran into a major performance problem. While performance on the same machine across processes using Named Pipes was excellent, the same was not true of machine-to-machine communications over TCP/IP. Sub-millisecond calls between services were taking 200-400ms across the wire. Something was terribly wrong. And when we tried Named Pipes from server to server, the performance problem went away. Of course, this was not the final answer because a .NET Named Pipes host can only handle 254 concurrent connections and we need to be able to scale beyond that.

Solving the problem required several sleepless nights and a weekend searching for the answer. My tests for TCP/IP with respect to performance have always run locally on the localhost loopback stack. The trouble with that, I have since learned (and should have known), is that when running locally, the Windows TCP stack bypasses the TCP stack altogether, or nearly so—sufficiently at least to mask the underlying problem of reading and writing only 16 bytes at a time directly on the NetworkStream.

After examining a number of open source implementations of Sockets on a server host, I ran into one or two smart enough to be using the BufferedStream to wrap that NetworkStream that a raw Socket object gives you. While doing all of this research, I also ran into the MSDN explanation (see Remarks and Example section) of how to improve server side asynchronous Socket handling. So I threw that into the solution as well. Once wired up and tested across machines on my home network, I breathed a huge sigh of relief. And here is what the code looks like now. First server and then client.

using System;
using System.Net.Sockets;
using System.Net;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace DuoVia.Net.TcpIp
{
  public class TcpHost : Host
  {
    private Socket _listener;
    private IPEndPoint _endPoint;
    private ManualResetEvent _listenResetEvent = new ManualResetEvent(false);

    /// <summary>
    /// Constructs an instance of the host and starts listening for 
		/// incoming connections on any ip address.
    /// All listener threads are regular background threads.
    /// </summary>
    /// <param name="port">The port number for incoming requests</param>
    /// <param name="log"></param>
    /// <param name="stats"></param>
    public TcpHost(int port, ILog log = null, IStats stats = null)
    {
      Initialize(new IPEndPoint(IPAddress.Any, port), log, stats);
    }

    /// <summary>
    /// Constructs an instance of the host and starts listening for incoming 
		/// connections on designated endpoint.
    /// All listener threads are regular background threads.
    /// 
    /// NOTE: the instance created from the specified type 
		/// is not automatically thread safe!
    /// </summary>
    /// <param name="endpoint"></param>
    /// <param name="log"></param>
    /// <param name="stats"></param>
    public TcpHost(IPEndPoint endpoint, ILog log = null, IStats stats = null)
    {
      Initialize(endpoint, log, stats);
    }

    private void Initialize(IPEndPoint endpoint, ILog log, IStats stats)
    {
      base.Log = log;
      base.Stats = stats;
      _endPoint = endpoint;
      _listener = new Socket(AddressFamily.InterNetwork, 
			  SocketType.Stream, ProtocolType.Tcp);
      _listener.SetSocketOption(SocketOptionLevel.Socket, 
			  SocketOptionName.KeepAlive, true);
      _listener.SetSocketOption(SocketOptionLevel.Socket, 
			  SocketOptionName.DontLinger, true);
    }

    /// <summary>
    /// Gets the end point this host is listening on
    /// </summary>
    public IPEndPoint EndPoint
    {
      get { return _endPoint; }
    }

    protected override void StartListener()
    {
      Task.Factory.StartNew(Listen, TaskCreationOptions.LongRunning);
    }

    private SocketAsyncEventArgs _acceptEventArg;

    /// <summary>
    /// Listens for incoming tcp requests.
    /// </summary>
    private void Listen()
    {
      try
      {
        _listener.Bind(_endPoint);
        _listener.Listen(8192);

        _acceptEventArg = new SocketAsyncEventArgs();
        _acceptEventArg.Completed 
				  += new EventHandler<SocketAsyncEventArgs>
					   (acceptEventArg_Completed);

        while (!_disposed)
        {
          // Set the event to nonsignaled state.
          _listenResetEvent.Reset();
          _acceptEventArg.AcceptSocket = null;
          try
          {
            if (!_listener.AcceptAsync(_acceptEventArg))
            {
              AcceptNewClient(_acceptEventArg);
            }
          }
          catch (Exception ex)
          {
            _log.Error("Listen error: {0}", 
						  ex.ToString().Flatten());
            break; //break loop on unhandled
          }

          // Wait until a connection is made before continuing.
          _listenResetEvent.WaitOne();
        }
      }
      catch (Exception e)
      {
        _log.Fatal("Listen fatal error: {0}", e.ToString().Flatten());
      }
    }

    private void acceptEventArg_Completed(object sender, 
		  SocketAsyncEventArgs e)
    {
      AcceptNewClient(e);
    }

    private void AcceptNewClient(SocketAsyncEventArgs e)
    {
      try
      {
        if (e.SocketError != SocketError.Success)
        {
          if (!_disposed) _listenResetEvent.Set();
          return;
        }

        Socket activeSocket = null;
        BufferedStream stream = null;
        try
        {
          activeSocket = e.AcceptSocket;

          // Signal the listening thread to continue.
          _listenResetEvent.Set();

          stream = new BufferedStream
					  (new NetworkStream(activeSocket), 8192);
          base.ProcessRequest(stream);
        }
        catch (Exception ex)
        {
          _log.Error("AcceptNewClient_ProcessRequest error: {0}", 
					  ex.ToString().Flatten());
        }
        finally
        {
          if (null != stream)
          {
            stream.Close();
          }
          if (null != activeSocket && activeSocket.Connected)
          {
            try
            {
              activeSocket.Shutdown(SocketShutdown.Both);
            }
            catch (Exception shutdownException)
            {
              _log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", 
							  shutdownException.ToString().Flatten());
            }

            try
            {
              activeSocket.Close();
            }
            catch (Exception closeException)
            {
              _log.Error("AcceptNewClient_ActiveSocketClose error: {0}", 
							  closeException.ToString().Flatten());
            }
          }
        }
      }
      catch (Exception fatalException)
      {
        _log.Fatal("AcceptNewClient fatal error: {0}", 
				  fatalException.ToString().Flatten());
      }
    }

    #region IDisposable Members

    private bool _disposed = false;

    protected override void Dispose(bool disposing)
    {
      if (!_disposed)
      {
        _disposed = true; //prevent second call to Dispose
        if (disposing)
        {
          _listenResetEvent.Set();
          _acceptEventArg.Dispose();
          _listener.Close();
          _listenResetEvent.Close();
        }
      }
      base.Dispose(disposing);
    }

    #endregion
  }
}

Client code:

using System;
using System.Net;
using System.Net.Sockets;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;

namespace DuoVia.Net.TcpIp
{
  public class TcpChannel : StreamingChannel
  {
    private Socket _client;

    /// <summary>
    /// Creates a connection to the concrete object handling 
    /// method calls on the server side
    /// </summary>
    /// <param name="serviceType"></param>
    /// <param name="endpoint"></param>
    public TcpChannel(Type serviceType, IPEndPoint endpoint)
    {
      _serviceType = serviceType;
      _client = new Socket(AddressFamily.InterNetwork, 
        SocketType.Stream, ProtocolType.Tcp);
      _client.LingerState.Enabled = false;
      _client.Connect(endpoint);
      if (!_client.Connected) throw new SocketException(); 
      _stream = new BufferedStream(new NetworkStream(_client), 8192);
      _binReader = new BinaryReader(_stream);
      _binWriter = new BinaryWriter(_stream);
      _formatter = new BinaryFormatter();
      SyncInterface(_serviceType);
    }

    public override bool IsConnected 
    { 
      get 
      { 
        return (null != _client) && _client.Connected; 
      } 
    }

    #region IDisposable override

    protected override void Dispose(bool disposing)
    {
      base.Dispose(disposing);
      if (disposing)
      {
        _binReader.Close();
        _binWriter.Close();
        _client.Close();
      }
    }

    #endregion
  }
}

You can find all of the ServiceWire code on GitHub or install the package from NuGet.

My Technical 2013

Technically speaking, I had a fun and productive 2013. Here are some highlights worth mentioning.

StorageClient: Client Side Load Balancing

A technology specific problem solved, bypassing server based solution with client side load balancing and fast fail retry algorithms that took us from horrible to nearly 5 nines in reliability while improving overall performance. (This was at the day job, so that's about as much as I can share about that.)

LocalCache: In Memory Cache with Async Persistence

A library that takes advantage of Concurrent Collections in .NET and SQLite to provide fast in-memory caching that persists asynchronously on local disk for rapid rehydration of in-memory cache when an application pool is recycled. This solved a big problem with service level compliance recovery on a critical service, taking complete recovery time from hours to a few minutes. (Also for the day job.)

DuoVia.Net: TCP and NamedPipes Services Library

An extension and revival of RemotingLite that makes intra-process communication easy and fast. This was my first foray into creating and sharing open source software on GitHub and publishing packages on NuGet. I enjoyed it so much, I added 8 more packages to the set. And while these projects were built on my own time, one or two of them are in regular use by one or two teams at the current day job and they have been downloaded over 1,500 times.

VersionedCollections: A Shared Idea Brought to Life

Recently I shared an idea with Ayende Rahien on his blog with respect to creating a snapshot-in-time, read-only view of a collection that is being written to constantly. I'm happy to report that it turned out to be exactly what he needed. And I am honored and appreciative to Ayende for the kudos. Sharing good ideas with community friends is almost as much fun as bringing them to life yourself.

Here's to an equally fun and productive 2014.

DMD x64 with Visual D in Visual Studio 2013 on Windows 8.1

About ten days ago, I installed the latest Visual D from GitHub and ran into some problems when trying out the DMD/GDC console application to use the “DMD | x64” config to compile a simple console app as an x64 native Windows application.

While I have not yet tested it, I believe the installer would have worked out of the box on my Windows 7 machine. But on my Windows 8.1 laptop, I had some trouble. Rather than boring you with all the things I tried, I will just share what finally worked for me.

Here’s the console app code I was working with:

import std.stdio;

int main(string[] argv)
{
    writeln("Hello D-World!");
	readln();
    return 0;
}

I had downloaded and installed Visual D with VisualD-v0.3.37.exe. I had also downloaded and installed the latest DMD compiler from dlang.org. In the process of figuring things out, I also reinstalled Visual D.

In Visual Studio, I created a new DMD/GDC console app from the D Language tab in the new project dialog. This console app template comes preconfigured with a configuration called "Debug DMD|x64" and I switched to that. But when I hit F6 to build, I would get the following error:

------ Build started: Project: ConsoleApp1, Configuration: Debug DMD x64 ------
Building Debug DMD x64\ConsoleApp1.exe...
LINK : fatal error LNK1181: cannot open input file 'user32.lib'
Building Debug DMD x64\ConsoleApp1.exe failed!
------

After doing some browsing and searching, I found the known issues page and the fix described on the page did not work but it did lead me to do some more digging and experimenting until I found that modifying the sc.ini file with some very specific changes solved the problem. Here are the relevant lines, as modified, in my sc.ini file that finally made it possible for my little program to compile. (update—only the Environment64 changes are required—while Rainer Scheutze suggests that the paths can be modifed in Visual Studio, I’ve not been able to make that work).

[Environment64]
   ; original LIB="%@P%\..\lib64"
   LIB="%@P%\..\lib64";\dm\lib;"C:\Program Files (x86)\Windows Kits\8.0\Lib\win8\um\x64";%DMD_LIB%

This post will be my saved notes for the next time I have to configure a Windows 8.1 machine for D programming language in Visual Studio 2013 (and beyond). And I hope you find it useful too.

Doors or Rooms?

Do you build rooms with doors or doors with rooms?

Should every room be the same size with doors of different size, shape and placement?

Are some doors in the floor or in the ceiling, at floor level or in the middle of the wall?

Do your doors open in our out? Are they too small? Too big?

Rooms are code that does stuff. Does work. Holds stuff. Creates stuff.

Doors are code that gets stuff into and out of rooms.

Which one do you spend more time working on?

Making Distributed Computing Relevant and Accessible

First, let us assume that distributed computing is generally that area of developing and running software designed to process large numbers of long running tasks on servers that are optimally proximal to the data being processed.

Second, let us agree, if for this discussion only, that distributed computing is NOT your collection of services on back end servers that support your service oriented architecture (SOA) for your web and mobile apps.

Third, let us presume that you are NOT already blessed with a job where you write distributed computing software.

How then can distributed computing be relevant to you? And how can you take advantage of distributed computing without becoming an expert in one of the several well known distributed computing platforms on the market today?

Both are excellent questions. Thank you for asking. Let’s try a practical approach.

Imagine you are at your desk and your boss comes to you and ask how fast your web servers respond to the customer. Of course, your first instinct is to write this program to find out:

private static void DoTenUrlsInParallel()
{
   Console.WriteLine("Do 10 urls in parallel");
   var sw = Stopwatch.StartNew();
   ISpeedTest test = new SpeedTest();
   Parallel.ForEach(TestUrls, (url) =>
   {
      var result = test.GetSpeed(url);
      Console.WriteLine("r:{0}, s:{1}, b:{2}, u:{3}",
         result.ResponseTimeMs, result.ReadStreamTimeMs, 
         result.ResponseLength, result.Url);
   });
   sw.Stop();
   Console.WriteLine("Total elapsed time: {0}", 
      sw.ElapsedMilliseconds);
   Console.WriteLine(string.Empty);
}

You take him the results and he says, “But isn’t this from your desk? I want to know what these numbers look like from all around the world. East and west U.S. North and west Europe. And east and south east Asia. And I want a regular stream of these numbers fed into a spreadsheet for me every day.”

Do you say, “No problem.” You do if you have a Windows Azure account and you know about the distributed task parallel library from DuoVia called DuoVia.Net.Distributed. You go back to your desk and modify the code to look like this:

private static void DoTenUrlsThreeTimesEachAroundTheWorldInParallel(bool runLocal = false)
{
   var serverEndpoints = new IPEndPoint[0];
   if (runLocal)
   {
      serverEndpoints = new IPEndPoint[] { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9096) };
   }
   else
   {
      //these server names are temporary - to run this test use your own
      var servers = new string[]
      {
         "myaz-westus.cloudapp.net",
         "myaz-eastus.cloudapp.net",
         "myaz-northeu.cloudapp.net",
         "myaz-westeu.cloudapp.net",
         "myaz-soeastasia.cloudapp.net",
         "myaz-eastasia.cloudapp.net"
      };

      serverEndpoints = new IPEndPoint[servers.Length];
      for (int i = 0; i < servers.Length; i++)
      {
         var host = Dns.GetHostAddresses(servers[i]);
         var ip = (from n in host 
                   where n.AddressFamily == AddressFamily.InterNetwork 
                   select n).First();
         serverEndpoints[i] = new IPEndPoint(ip, 9096);
      }
   }

   float subscriptionRate = 2.0f; //oversubscribed 
   int logPollingIntervalSeconds = 2;
   using (DistributedClient<ISpeedTest> client = 
          Distributor.Connect<ISpeedTest>(typeof(SpeedTest),
          subscriptionRate,
          logPollingIntervalSeconds,
          LogLevel.Debug,
          serverEndpoints))
   {
      for (int i = 0; i < 3; i++)
      {
         var sw = Stopwatch.StartNew();
         Console.WriteLine(@"round:{0}", i + 1);
         var loopResult = client.ForEach(TestUrls, (url, proxy) => proxy.GetSpeed(url));
         foreach (var result in loopResult.Results)
         {
            Console.WriteLine(@"r:{0}, s:{1}, b:{2}, on: {3}, u:{4}",
               result.ResponseTimeMs, result.ReadStreamTimeMs, 
			   result.ResponseLength, result.MachineName, result.Url);
         }
         sw.Stop();
         Console.WriteLine("Total elapsed time: {0}", sw.ElapsedMilliseconds);
         Console.WriteLine(string.Empty);
      }
   }
}

And you and your boss are happy.

Sometimes distributed computing is more about location and proximity to data or infrastructure than it is to getting massive amounts of data processed in as little time as possible.

You can find the full demo source code here.

Diversions in the D Programming Language

I am not a systems programmer, meaning I do not write operating system device drivers or file systems or operating system modules, etc, all written in a language that will compile down to raw machine code. I write in C# primarily which is arguably an applications programming language, running in the much loved .NET Common Language Runtime.

The vast majority of systems programming is done in C and C++. And for some reason, C++ has always been a daunting mess of libraries, odd syntax and pointer and memory allocation madness to me. Even setting up an environment to get the right build libraries, the right compiler and linker, etc., have always led me to fits of impatience. And for that reason and many others, I have stuck to C# and applications development.

But every once in a while, I look in on systems programming to see if anyone has really solved the problems I love to hate with respect to C and C++. And for a few years I’ve read a little about the D programming language here and there. A week ago, over the weekend, I decided to give it a try and really see what I could learn.

I have to say, I have been impressed. The D programming language offers a few things that I would dearly love to see in C#.

1. Exception Safety – the scope keyword

void abc() 
{ 
  auto resource = getresource();  // acquire some resource 
  scope(exit) resource.close();   // close the resource 
  doSomeProcessing();             // do processing
}

As C# programmers, we’re used to the try..catch..finally blocks. And we clean up a resource in the finally block. The trouble with that is many lines of code can end up separating your resource acquisition code from your resource cleanup code. Yes, with vigilance and well written tests, this is okay. But wouldn’t it be cool to be able to tell the compiler, “Hey, when I’m done with this thing I just now created, clean it up for me, no matter what code comes after this in this method.” I would love to see the scope keyword added to C#.

2. Concurrency approach

int perThread;
shared int PerProcess;

In C#, when you declare a class level variable, it is automatically shared between threads. You can use the [ThreadStatic] attribute to get a per thread instance of a given value or object. But it then has to be static. With the D programming language, you get thread safety in class variables. To override that safety, you have to explicitly tell the compiler you want the value shared. While I’m not advocating a change to C# in this regard, I would love to have a way to assure that a variable cannot be modified across thread boundaries.

3. Message based threads

import std.concurrency, std.stdio;
void main() {
   auto low = 0, high = 100;
   auto tid = spawn(&writer);
   foreach (i; low .. high) {
      writeln("Main thread: ", i);
      tid.send(thisTid, i);
      enforce(receiveOnly!Tid() == tid);
   }
}

void writer() {
   for (;;) {
      auto msg = receiveOnly!(Tid, int)();
      writeln("Secondary thread: ", msg[1]);
      msg[0].send(thisTid);
   }
}

For me, this is perhaps the coolest part of the D programming language’s base class library which they call Phobos. Note that main spawns a thread calling writer. The loop in main then sends a message to writer and the loop in writer receives the messages and operates on them and then sends a message back to the original thread.

You can learn a lot about D on www.dlang.org and read more about D concurrency on Informit. And if you want to play with D in Visual Studio, hop on over to see VisualD on dsource.org.

PooledDictionary<TKey, TValue> - A Thread Safe Object Pool by Key

Work on my DuoVia.Net and DuoVia.MpiVisor projects has been progressing well. I now have an opportunity to use the libraries in a significant use case at Ancestry.com, my day job, which has been very useful in finding ways to improve the libraries. (Disclaimer: Ancestry.com does not endorse the DuoVia library and I am not a spokesman for Ancestry.)

In its first incarnation, the ProxyFactory created a new dynamic assembly each time it created a proxy. This was expensive in terms of creation time but more so in terms of memory once many thousands of proxies had been created. Assemblies are kept in memory for the life of the process.

First I tried a Dictionary of ProxyBuilder objects, the container class for the objects needed to create an instance of the dynamically generated assembly’s proxy type that implements the target interface. I used a lock on this Dictionary but of course that created a bottleneck and a many threaded application trying to create many proxy connections would run into that bottleneck.

Next I tried a ThreadStatic instance of that Dictionary, keeping a ProxyBuilder for each key type. This eliminated the bottle neck but necessitated the creation of many more ProxyBuilder objects than was necessary and no guarantee could be made that these objects would ever be utilized more than once. In a multithreaded client using thread pool threads or its own threads, over time, memory usage and performance would be negatively impacted.

Pooling was to the answer. But how can you pool objects of the same base type by key rather than type? There are many object pool examples to be found but all those that I found were based on type alone. Time to roll my own. And PooledDictionary<TKey, TValue> is what I came up with.

First the code that uses it so you can get a feel for how easy it is to use. Note that the Request method’s Func<TValue> parameter called CreateProxyBuilder. The CreateProxyBuilder is the costly and complex method that creates the dynamic assembly and collects the necessary objects into the ProxyBuilder object that will be required to create an instance of the proxy for the target interface. The function is used to create a new ProxyBuilder if the pool is depleted.

Using the PooledDictionary<TKey, TValue>

private static PooledDictionary<string, ProxyBuilder> _proxies = 
  new PooledDictionary<string, ProxyBuilder>();

public static TInterface CreateProxy<TInterface>(Type channelType, 
  Type ctorArgType, object channelCtorValue) where TInterface : class
{
  if (!channelType.InheritsFrom(typeof(Channel))) 
  {
    throw new ArgumentException("channelType does not inherit from Channel");
  }
  Type interfaceType = typeof(TInterface);
  var proxyName = interfaceType.FullName + channelType.FullName + ctorArgType.FullName;

  //get pooled proxy builder
  var localChannelType = channelType;
  var localCtorArgType = ctorArgType;
  ProxyBuilder proxyBuilder = _proxies.Request(proxyName, () => 
    CreateProxyBuilder(proxyName, interfaceType, localChannelType, localCtorArgType));

  //create proxy
  var proxy = CreateProxy<TInterface>(proxyBuilder, channelCtorValue);

  //return builder to the pool
  _proxies.Release(proxyName, proxyBuilder);

  return proxy;
}

And now the code that makes the magic happen. Note the use of the System.Collections.Concurrent namespace. While sometimes heavy, these collections really do have their place on the parallel programmer’s palette.

PooledDictionary<TKey, TValue>

public class PooledDictionary<TKey, TValue> 
{
  private readonly ConcurrentDictionary<TKey, ConcurrentQueue<TValue>> _dq;
  private readonly int _concurrencyLevel;
  private readonly int _size;

  public PooledDictionary()
  {
    _concurrencyLevel = Environment.ProcessorCount * 8;
    _size = _concurrencyLevel * _concurrencyLevel;
    _dq = new ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>(_concurrencyLevel, _size);
  }

  public void Add(TKey key, TValue value)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      q.Enqueue(value);
    }
    else
    {
      throw new ArgumentException("Unable to add value");
    }
  }

  public int Count(TKey key)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      return q.Count;
    }
    return 0;
  }

  public TValue Request(TKey key, Func<TValue> creator = null)
  {
    if (!_dq.ContainsKey(key)) _dq.TryAdd(key, new ConcurrentQueue<TValue>());
    ConcurrentQueue<TValue> q;
    if (_dq.TryGetValue(key, out q))
    {
      TValue v;
      if (q.TryDequeue(out v)) return v;
      if (null != creator) return creator();
    }
    return default(TValue);
  }

  public void Release(TKey key, TValue value)
  {
    Add(key, value); //just adds it back to key's queue
  }
}

I hope this little gem is as useful to you as it has been to me. In tests and production, I have found it to be as fast or faster than the ThreadStatic collection approach. And memory consumption in production has returned to satisfactory levels because we are now creating only the number of ProxyBuilders that we need and using those efficiently.