Only every third object is consumed in BlockingCollection<string>(ConcurrentQueue<string>)

Magnus Vinterhav 61 Reputation points
2021-01-07T20:32:50.747+00:00

Hi.

I'm building a multi-threaded simulator and need to create and consume ID's in an orderly fashion.
I am doing this in C# and Core 3.1 and it is a WPF application.
Please note that I am just learning about multi-threading AND queues, so it is possible I made some silly mistake I am not aware of.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Text;
using StructureLibrary;
using System.Threading;
using System.Linq;
using System.ComponentModel;

namespace WorldEngine
{
    public class IdIssuer // ID Issuing Authority
    {
        // This must be run in it's own thread and is designed to issue IDs to all objects that come into creation.
        // Here it is also possible to see how many IDs that has been issued.

        private BlockingCollection<string> logger;
        string loggerPrefix;
        ulong id;
        BlockingCollection<ulong> idQueue;
        bool stopCommand;

        public IdIssuer(ref BlockingCollection<string> loggerIn)
        {
            logger = loggerIn;
            loggerPrefix = TypeDescriptor.GetClassName(this);

            id = 0;
            this.idQueue = new BlockingCollection<ulong>(new ConcurrentQueue<ulong>());
            stopCommand = false;
        }

        public void StartIssuingIds()
        {
            while (stopCommand == false)
            {
                if(idQueue.Count() < 20)
                {
                    for(int x = 0; x < 20; x++)
                    {
                        idQueue.Add(IssueANewIdPlease());
                    }
                }
            }
        }

        public ref BlockingCollection<ulong> GetQueue()
        {
            return ref this.idQueue;
        }
        public ulong IssueANewIdPlease()
        {
            id++;
            logger.Add(loggerPrefix + " ID was created: " + id);
            return id;
        }

        public ulong GimmeCountOfIds()
        {
            return id;
        }

        public void StopIssuingIds()
        {
            stopCommand = true;
        }

    }

}

My problem is that it seems that two out of every three IDs disappear.

I get the IDs in the consumers by this method:

public ulong GetIdFromQueue()
{
    ulong tempId = idQueue.Take();
    tempId = idQueue.Take();
    bool hasReceivedId = idQueue.TryTake(out tempId);
    if(hasReceivedId)
    {
        logger.Add(loggerPrefix + " Success! Id " + tempId + " was received from idQueue.");
    }
    else
    {
        logger.Add(loggerPrefix + " could not receive id from idQueue.");
        tempId = (ulong)0;
    }

    return tempId;
}

When I run the program and put a breakpoint in the code, I can clearly see that there are two out of three ids missing.
I have four threads as consumers and these four consumers are four threads of the same class.
I think that if the breakpoint show just one out of every four threads, then the missing IDs would be three out of four, not two out of three.

I am not sure how to use the BlockingCollection together with the ConcurrentQueue, and my suspicion is that this is what is causing this problem.

Interestingly, I had the same problem with a logging process based on exactly the same type of BlockingCollection and ConcurrentQueue, and when the consuming logger used the BlockingCollection.Take method, the logger also logged only every third line coming from the producers.
When I changed the way the logger consumed the BlockingCollection<string>(ConcurrentCollection<string), from "BlockingCollection.Take()" to "foreach(string s in BlockingCollection.GetConsumerEnumerable() then this weird behavior stopped, and every line was logged.

Now in the case with the logger it was easy to solve, because in that example there were several producers and one consumer, so just taking an enumerable and print out all of it worked fine, but in the ID example we have the reverse situation with one producer and several consumers. I can't use GetConsumerEnumerable there, because that would miss the purpose of issuing IDs.

I hope I have explained this problem well enough.

Thank you.

C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
10,471 questions
.NET Runtime
.NET Runtime
.NET: Microsoft Technologies based on the .NET software framework.Runtime: An environment required to run apps that aren't compiled to machine language.
1,135 questions
0 comments No comments
{count} votes

Accepted answer
  1. Viorel 113.7K Reputation points
    2021-01-07T21:11:35.133+00:00

    Maybe replace the beginning of the function:

    public ulong GetIdFromQueue( )
    {
       ulong tempId;
       bool hasReceivedId = idQueue.TryTake(out tempId);
       if(hasReceivedId)
       . . .
    
    0 comments No comments

1 additional answer

Sort by: Most helpful
  1. Magnus Vinterhav 61 Reputation points
    2021-01-07T21:54:26.81+00:00

    Oh, for crying out loud!

    Thank you. A lot.

    0 comments No comments