57

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
3
  • According to our on-topic guidance, "Some questions are still off-topic, even if they fit into one of the categories listed above:...Questions asking us to recommend or find a book, tool, software library, tutorial or other off-site resource are off-topic..." Jun 29, 2018 at 10:58
  • An await-able queue is what I thought of recently too (here is my question: stackoverflow.com/questions/52775484/…)! It would solve SO MANY issues in a microservices architecture, I believe! But in that case, the queue should probably be a persistent queue and not something in-memory. Oct 15, 2018 at 7:55

10 Answers 10

71

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13);

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

4
  • This looks very promising... will check it out tomorrow. Thanks. It's looks very much like a CCR port.
    – spender
    Oct 23, 2011 at 19:47
  • 3
    Took a peek before bedtime instead! It looks like Dataflow fits my needs very nicely. It seems to bridge the gap between what's offered by TPL and what's offered in CCR (which I have used to great success). It leaves me feeling positive that the excellent work in CCR hasn't been squandered. This is the right answer (and something shiny and new to sink my teeth into!) Thanks @StephenCleary.
    – spender
    Oct 23, 2011 at 22:39
  • Stephen Cleary's own Nito.AsyncEx library also has AsyncProducerConsumerQueue<T> which is an alternative to BufferBlock<T>.
    – Fanblade
    May 14, 2021 at 16:00
  • 2
    @Fanblade: True, but these days I point people towards System.Threading.Channels. Channels are a very efficient and very modern solution. May 14, 2021 at 17:37
39

Simple approach with C# 8.0 IAsyncEnumerable and Dataflow library

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

With an implementation of AsyncQueue as follows:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) => _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync(token);

        try 
        {
            // Return new elements until cancellationToken is triggered.
            while (true) 
            {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } 
        finally 
        {
            _enumerationSemaphore.Release();
        }

    }
}
7
  • 1
    I love it when an old question gets a modern update. Have an upvote. I haven't checked out IAsyncEnumerable but am very familiar with javascript's Symbol.asyncIterator which looks like more or less the same concept.
    – spender
    Apr 30, 2019 at 1:33
  • Thanks @spender! I think so, it's basically an IEnumerable, but you can asynchronously await new items so it's a non-blocking operation.
    – Bruno Zell
    Apr 30, 2019 at 1:41
  • I wonder, is there any specific reason for using SemaphoreSlim(1) instead of a lock ?
    – valorl
    May 1, 2020 at 13:05
  • 4
    @valori inside a lock there can't be an await
    – Bruno Zell
    May 2, 2020 at 8:50
  • 1
    cancellation token could be given to WaitAsync(token) as param too
    – Vinigas
    Jan 19, 2023 at 13:33
33

There is an official way to do this now: System.Threading.Channels. It's built into the core runtime on .NET Core 3.0 and higher (including .NET 5.0 and 6.0), but it's also available as a NuGet package on .NET Standard 2.0 and 2.1. You can read through the docs here.

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

To enqueue work:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

To complete the channel:

channel.Writer.TryComplete();

To read from the channel:

var i = await channel.Reader.ReadAsync();

Or, if you have .NET Core 3.0 or higher:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
2
  • 1
    Next time I have an opportunity to use such a structure, I'll check this out... If it works out, I'll give you the green tick. ...Writer.WaitToWriteAsync() on a bounded queue also looks super handy. Nice find... Thanks for adding this.
    – spender
    Apr 9, 2021 at 21:26
  • 3
    Useful link: An Introduction to System.Threading.Channels Apr 10, 2021 at 6:54
7

One simple and easy way to implement this is with a SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

The beauty of this is that the SemaphoreSlim handles all of the complexity of implementing the Wait() and WaitAsync() functionality. The downside is that queue length is tracked by both the semaphore and the queue itself, and they both magically stay in sync.

5
  • Good if performance is not of the highest importance, bursts of enqueues or dequeues are not expected and time to process each item is significant. It uses locking, meaning that the collection can only be accessed by one thread at a time and all others will wait blocking when enqueuing or dequeuing an item.
    – Jordi
    Nov 4, 2020 at 12:55
  • The result of semaphore.WaitAsync() should taken into account, and in case the timeout is reached return null, a default value or throw an exception. Jan 24, 2021 at 16:01
  • @GuillermoPrandi The semaphore.WaitAsync task does not return a value. If the timeout is reached, it will throw a TaskCanceledException which will bubble.
    – Ryan
    Feb 8, 2021 at 0:55
  • @Ryan learn.microsoft.com/en-us/dotnet/api/… "A task that will complete with a result of true if the current thread successfully entered the SemaphoreSlim, otherwise with a result of false." Feb 8, 2021 at 12:35
  • How do you use this to queue a list of awaitable jobs with returns? Mar 18, 2021 at 5:44
5

My atempt (it have an event raised when a "promise" is created, and it can be used by an external producer to know when to produce more items):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
2
  • I think this is the best solution. I've implemented this and tested it extensively. A few notes: the call to !promise.Task.IsCanceled is unnecessary. I added a ManualResetEventSlim to track when the bufferQueue is empty so that a caller can block to wait for the queue to empty. Mar 9, 2016 at 14:58
  • 1
    You should be disposing CancellationTokenRegistration you got from the cancellationToken.Register call.
    – Paya
    Mar 6, 2017 at 1:27
1

It may be overkill for your use case (given the learning curve), but Reactive Extentions provides all the glue you could ever want for asynchronous composition.

You essentially subscribe to changes and they are pushed to you as they become available, and you can have the system push the changes on a separate thread.

1
  • 1
    I'm at least partially versed in Reactive, but it's a little esoteric to use in production as others may have to maintain the code. I'm really digging the simplicity that async/await is bringing to a previously very complicated server product, and I'm trying to keep all the async tech under a single technology.
    – spender
    Oct 23, 2011 at 0:53
1

Check out https://github.com/somdoron/AsyncCollection, you can both dequeue asynchronously and use C# 8.0 IAsyncEnumerable.

The API is very similar to BlockingCollection.

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

With IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();
1
  • Your example var item = await collection.TakeAsync() seems suitable for a single consumer only. With multiple consumers you may get InvalidOperationExceptions. I think you should use TryTakeAsync instead of TakeAsync, to make it work correctly with multiple consumers too. Aug 8, 2019 at 22:50
0

Here's the implementation I'm currently using.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

It works good enough, but there's quite a lot of contention on queueSyncLock, as I am making quite a lot of use of the CancellationToken to cancel some of the waiting tasks. Of course, this leads to considerably less blocking I would see with a BlockingCollection but...

I'm wondering if there is a smoother, lock free means of achieving the same end

0

Well 8 years later I hit this very question and was about to implement the MS AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Thanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.

So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.

I use this in an AspNet Core background thread and it runs fully async.

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);

    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }

}

public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

I found that using the queue.OutputAvailableAsync() fixed the issue that I had with AsyncQueue<> -- trying to determine when the queue was complete and not having to inspect the dequeue task.

5
  • Awaiting both queue.DequeueAsync() and queue.Completion with Task.WhenAny is a clever trick, but it feels like a hack to overcome the shortcomings of a poor API design. The alternative classes (Dataflow BufferBlock<T> and Channel<T>) offer methods (OutputAvailableAsync and WaitToReadAsync respectively) that allow awaiting for more elements without having to handle an exception as a feedback mechanism. The problem with your trick is that you may end up with a faulted task with its exception not observed, triggering in this case the TaskScheduler.UnobservedTaskException event. Mar 10, 2020 at 16:24
  • There are other means of notification in the class - but MS didn't have an example in the docs. In my case I have multiple tasks to await so I had to use WhenAny anyway. -and if a Task throws it can be caught as an AggregateException.
    – bmiller
    Mar 11, 2020 at 17:30
  • My point is that the class Microsoft.VisualStudio.Threading.AsyncQueue<T> shouldn't be used for new projects, because today there are better alternatives available. Especially the Channel<T> class, that not only offers a better API but it also has excellent performance characteristics. Mar 11, 2020 at 18:28
  • 1
    ok, you're right, AsyncQueue is based on the TPL library and looks to be designed to work in Visual Studio extensions. I'll edit my answer with my implementation. Thanks for your comment, you may have saved me a pile of headache's.
    – bmiller
    Mar 11, 2020 at 19:29
  • There is a potential race condition in your new implementation (the one based on a BufferBlock), that could surface in case you had multiple consumers. The Receive method could be called by one consumer just after another consumer has taken the last item from the queue. For this reason it is preferable to use the TryReceive method, as a condition either in an if or in a while block, so that you don't have to review the consuming code in case you update your architecture later. Look here for an example. Mar 11, 2020 at 22:43
-6

You could just use a BlockingCollection ( using the default ConcurrentQueue ) and wrap the call to Take in a Task so you can await it:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
1
  • 8
    Nice idea, but I'm not happy with blocking. I'm going to have a few thousand clients each with their own message queue. Any blocking will sink the ship because it will tie up threads doing nothing. The reason I want an awaitable, non-blocking Task is so I can keep all operations in the threadpool without causing threadpool starvation.
    – spender
    Oct 23, 2011 at 12:58

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.