c# - Producer Consumer queue does not dispose -


i have built producer consumer queue wrapping concurrentqueue of .net 4.0 slimmanualresetevent signaling between producing (enqueue) , consuming (while(true) thread based. queue looks like:

public class producerconsumerqueue<t> : idisposable, iproducerconsumerqueue<t> {     private bool _isactive=true;      public int count     {                 {             return this._workerqueue.count;         }     }      public bool isactive     {         { return _isactive; }         set { _isactive = value; }     }      public event dequeued<t> ondequeued = delegate { };     public event loggedhandler onlogged = delegate { };      private concurrentqueue<t> _workerqueue = new concurrentqueue<t>();      private object _locker = new object();      thread[] _workers;      #region idisposable members      int _workercount=0;      manualreseteventslim _mres = new manualreseteventslim();      public void dispose()     {         _isactive = false;          _mres.set();          logwriter.write("55555555555");            (int = 0; < _workercount; i++)           // wait consumer's thread finish.           {              _workers[i].join();                   }            logwriter.write("6666666666");      // release os resources.     }     public producerconsumerqueue(int workercount)     {         try         {             _workercount = workercount;             _workers = new thread[workercount];             // create , start separate thread each worker             (int = 0; < workercount; i++)                 (_workers[i] = new thread(work)).start();         }         catch (exception ex)         {             onlogged(ex.message + ex.stacktrace);         }      }     #endregion      #region iproducerconsumerqueue<t> members      public void enqueuetask(t task)     {         if (_isactive)         {             _workerqueue.enqueue(task);             //monitor.pulse(_locker);             _mres.set();         }     }      public void work()     {       while (_isactive)       {           try           {               t item = dequeue();               if (item != null)                   ondequeued(item);           }           catch (exception ex)           {               onlogged(ex.message + ex.stacktrace);           }                     }     }      #endregion     private t dequeue()     {         try         {             t dequeueitem;             //if (_workerqueue.count > 0)             //{             _workerqueue.trydequeue(out dequeueitem);             if (dequeueitem != null)                 return dequeueitem;             //}             if (_isactive)             {                 _mres.wait();                 _mres.reset();             }             //_workerqueue.trydequeue(out dequeueitem);             return dequeueitem;         }         catch (exception ex)         {             onlogged(ex.message + ex.stacktrace);             t dequeueitem;             //if (_workerqueue.count > 0)             //{             _workerqueue.trydequeue(out dequeueitem);             return dequeueitem;         }      }       public void clear()     {         _workerqueue = new concurrentqueue<t>();     } } 

}

when calling dispose blocks on join (one thread consuming) , dispose method stuck. guess get's stuck on wait of resetevents call set on dispose. suggestions?

update: understand point needing queue internally. suggestion use blockingcollection<t> based on fact code contains lot of logic provide blocking behavior. writing such logic prone bugs (i know experience); when there's existing class within framework @ least some of work you, it's preferable go that.

a complete example of how can implement class using blockingcollection<t> little bit large include in answer, i've posted working example on pastebin.com; feel free take , see think.

i wrote example program demonstrating above example here.

is code correct? wouldn't yes too confidence; after all, haven't written unit tests, run diagnostics on it, etc. it's basic draft give idea how using blockingcollection<t> instead of concurrentqueue<t> cleans lot of logic (in opinion) , makes easier focus on main purpose of class (consuming items queue , notifying subscribers) rather difficult aspect of implementation (the blocking behavior of internal queue).


question posed in comment:

any reason you're not using blockingcollection<t>?

your answer:

[...] needed queue.

from msdn documentation on default constructor blockingcollection<t> class:

the default underlying collection concurrentqueue<t>.

if only reason opted implement own class instead of using blockingcollection<t> need fifo queue, then... might want rethink decision. blockingcollection<t> instantiated using default parameterless constructor is fifo queue.

that said, while don't think can offer comprehensive analysis of code you've posted, can @ least offer couple of pointers:

  1. i'd hesitant use events in way here class deals such tricky multithreaded behavior. calling code can attach event handlers wants, , these can in turn throw exceptions (which don't catch), block long periods of time, or possibly deadlock reasons outside control--which bad in case of blocking queue.
  2. there's race condition in dequeue , dispose methods.

look @ these lines of dequeue method:

if (_isactive) // point {     _mres.wait(); // point c     _mres.reset(); // point d } 

and take @ these 2 lines dispose:

_isactive = false;  _mres.set(); // point b 

let's have 3 threads, t1, t2, , t3. t1 , t2 both @ point a, each checks _isactive , finds true. dispose called, , t3 sets _isactive false (but t1 , t2 have passed point a) , reaches point b, calls _mres.set(). t1 gets point c, moves on point d, , calls _mres.reset(). t2 reaches point c , stuck forever since _mres.set not called again (any thread executing enqueue find _isactive == false , return immediately, , thread executing dispose has passed point b).

i'd happy try , offer on solving race condition, i'm skeptical blockingcollection<t> isn't in fact class need this. if can provide more information convince me isn't case, maybe i'll take look.


Comments

Popular posts from this blog

ASP.NET/SQL find the element ID and update database -

jquery - appear modal windows bottom -

c++ - Compiling static TagLib 1.6.3 libraries for Windows -