24

I am using a .NET 4.0 BlockingCollection to handle a queue of items that each need to be processed by an operation that can take up to a second to process each item. This queue of items can be added to by different threads.

I have a couple of questions regarding this a) allowing multiple consumers to work on this BlockingCollection? I noticed GetConsumingEnumerable(), which seems to be applicable for single consumer scenarios. The reason for having multiple consumers is that the processing, via a named pipe instance, can process up to three of these items at a time, so I thought I could have three consumers.

b) Is there a way of checking to see if an item is on this queue, and if so, getting the caller that checks to see if there is an item to block until the item has been processed?

EDIT:

Based on Jon Skeet's answer here's some sample code to illustrate multiple consumers acting on a BlockingCollection populated by a single producer, with consumers using GetConsumingEnumerable():

static BlockingCollection<string> coll = new BlockingCollection<string>();

static void Consume()
{
    foreach (var i in coll.GetConsumingEnumerable())
    {
        Console.WriteLine(String.Format("Thread {0} Consuming: {1}",  Thread.CurrentThread.ManagedThreadId, i));
        Thread.Sleep(1000);
    }
}

static void Main(string[] args)
{
    int item = 0;

    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            coll.Add(string.Format("Item {0}", item++));
            Thread.Sleep(500);
        }
    });

    for (int i = 0; i < 2; i++)
    {
        Task.Factory.StartNew(() => Consume());
    }

    while (true) ;
}

The items are processed in an interleaved manner between the two consumers operating on the two different threads, e.g.

Thread 4 Consuming: Item 0
Thread 5 Consuming: Item 1
Thread 4 Consuming: Item 2
Thread 5 Consuming: Item 3
Thread 4 Consuming: Item 4
1
  • When using GetConsumingEnumerable it is important that the producer calls CompleteAdding() so that consumers know when to stop. Commented Apr 26, 2019 at 7:05

2 Answers 2

14

Multiple consumers can just call Take or TryTake concurrently - each item will only be consumed by a single consumer.

However, I believe GetConsumingEnumerable will also do what you want. I believe if each caller calls that, each will get a separate consuming enumerable, which again will make sure that each item is only consumed once. I'm not sure offhand what happens when the queue becomes empty - I don't know whether MoveNext() then blocks, or returns false.

I didn't really follow your second question though...

Sign up to request clarification or add additional context in comments.

5 Comments

Sorry - it seems a little vague on second read. So, the problem is I want to be able to determine if an item is on the queue for processing (simple as I can just write a linq query to check this) so I don't add duplicate items to the queue (and prevent unnecessary duplicate processing). This queue is the input to a PDF writer over a named pipe, which writes PDFs in a shared location.
Now, if an item is requested that is already on the queue, (say by my HttpHandler I've written), I want the calling request on the HttpHandler to block until that item has been processed, so I can guarantee that the task has completed, and the PDF file exists on disk, before serving it up. Hoping the context helps!
@pkiddie: Wouldn't you also want to know if the item has already been processed, too?
Yes, so what I do is check to see if the PDF exists on disk before adding the item representing it to the queue, and just serve that up if it exists
@pkiddie: I see. Could your processing part atomically try to create the file instead? That way it wouldn't matter if the same item was on the queue several times - only one thing would try to process it, because after that the processor could say "It's either been done or in progress".
10

GetConsumingEnumerable is in fact safe to call from multiple consumers simultaneously; the enumerable only completes when the collection is marked as complete. Each item is only consumed once.

GetConsumingEnumerable is essentially equivalent to:

while (!IsCompleted)
{
  if (TryTake(out var item, Timeout.Infinite))
    yield return item;
}

plus a bit of cancellation/cleanup logic.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.