c# - Producer Consumer queue does not dispose -
i have built producer consumer queue wrapping concurrentqueue of .net 4.0 slimmanualresetevent signaling between producing (enqueue) , consuming (while(true) thread based. queue looks like:
public class producerconsumerqueue<t> : idisposable, iproducerconsumerqueue<t> { private bool _isactive=true; public int count { { return this._workerqueue.count; } } public bool isactive { { return _isactive; } set { _isactive = value; } } public event dequeued<t> ondequeued = delegate { }; public event loggedhandler onlogged = delegate { }; private concurrentqueue<t> _workerqueue = new concurrentqueue<t>(); private object _locker = new object(); thread[] _workers; #region idisposable members int _workercount=0; manualreseteventslim _mres = new manualreseteventslim(); public void dispose() { _isactive = false; _mres.set(); logwriter.write("55555555555"); (int = 0; < _workercount; i++) // wait consumer's thread finish. { _workers[i].join(); } logwriter.write("6666666666"); // release os resources. } public producerconsumerqueue(int workercount) { try { _workercount = workercount; _workers = new thread[workercount]; // create , start separate thread each worker (int = 0; < workercount; i++) (_workers[i] = new thread(work)).start(); } catch (exception ex) { onlogged(ex.message + ex.stacktrace); } } #endregion #region iproducerconsumerqueue<t> members public void enqueuetask(t task) { if (_isactive) { _workerqueue.enqueue(task); //monitor.pulse(_locker); _mres.set(); } } public void work() { while (_isactive) { try { t item = dequeue(); if (item != null) ondequeued(item); } catch (exception ex) { onlogged(ex.message + ex.stacktrace); } } } #endregion private t dequeue() { try { t dequeueitem; //if (_workerqueue.count > 0) //{ _workerqueue.trydequeue(out dequeueitem); if (dequeueitem != null) return dequeueitem; //} if (_isactive) { _mres.wait(); _mres.reset(); } //_workerqueue.trydequeue(out dequeueitem); return dequeueitem; } catch (exception ex) { onlogged(ex.message + ex.stacktrace); t dequeueitem; //if (_workerqueue.count > 0) //{ _workerqueue.trydequeue(out dequeueitem); return dequeueitem; } } public void clear() { _workerqueue = new concurrentqueue<t>(); } } }
when calling dispose blocks on join (one thread consuming) , dispose method stuck. guess get's stuck on wait of resetevents call set on dispose. suggestions?
update: understand point needing queue internally. suggestion use blockingcollection<t> based on fact code contains lot of logic provide blocking behavior. writing such logic prone bugs (i know experience); when there's existing class within framework @ least some of work you, it's preferable go that.
a complete example of how can implement class using blockingcollection<t> little bit large include in answer, i've posted working example on pastebin.com; feel free take , see think.
i wrote example program demonstrating above example here.
is code correct? wouldn't yes too confidence; after all, haven't written unit tests, run diagnostics on it, etc. it's basic draft give idea how using blockingcollection<t> instead of concurrentqueue<t> cleans lot of logic (in opinion) , makes easier focus on main purpose of class (consuming items queue , notifying subscribers) rather difficult aspect of implementation (the blocking behavior of internal queue).
question posed in comment:
any reason you're not using
blockingcollection<t>?
your answer:
[...] needed queue.
from msdn documentation on default constructor blockingcollection<t> class:
the default underlying collection
concurrentqueue<t>.
if only reason opted implement own class instead of using blockingcollection<t> need fifo queue, then... might want rethink decision. blockingcollection<t> instantiated using default parameterless constructor is fifo queue.
that said, while don't think can offer comprehensive analysis of code you've posted, can @ least offer couple of pointers:
- i'd hesitant use events in way here class deals such tricky multithreaded behavior. calling code can attach event handlers wants, , these can in turn throw exceptions (which don't catch), block long periods of time, or possibly deadlock reasons outside control--which bad in case of blocking queue.
- there's race condition in
dequeue,disposemethods.
look @ these lines of dequeue method:
if (_isactive) // point { _mres.wait(); // point c _mres.reset(); // point d } and take @ these 2 lines dispose:
_isactive = false; _mres.set(); // point b let's have 3 threads, t1, t2, , t3. t1 , t2 both @ point a, each checks _isactive , finds true. dispose called, , t3 sets _isactive false (but t1 , t2 have passed point a) , reaches point b, calls _mres.set(). t1 gets point c, moves on point d, , calls _mres.reset(). t2 reaches point c , stuck forever since _mres.set not called again (any thread executing enqueue find _isactive == false , return immediately, , thread executing dispose has passed point b).
i'd happy try , offer on solving race condition, i'm skeptical blockingcollection<t> isn't in fact class need this. if can provide more information convince me isn't case, maybe i'll take look.
Comments
Post a Comment