c# - Producer Consumer queue does not dispose -
i have built producer consumer queue wrapping concurrentqueue of .net 4.0 slimmanualresetevent signaling between producing (enqueue) , consuming (while(true) thread based. queue looks like:
public class producerconsumerqueue<t> : idisposable, iproducerconsumerqueue<t> { private bool _isactive=true; public int count { { return this._workerqueue.count; } } public bool isactive { { return _isactive; } set { _isactive = value; } } public event dequeued<t> ondequeued = delegate { }; public event loggedhandler onlogged = delegate { }; private concurrentqueue<t> _workerqueue = new concurrentqueue<t>(); private object _locker = new object(); thread[] _workers; #region idisposable members int _workercount=0; manualreseteventslim _mres = new manualreseteventslim(); public void dispose() { _isactive = false; _mres.set(); logwriter.write("55555555555"); (int = 0; < _workercount; i++) // wait consumer's thread finish. { _workers[i].join(); } logwriter.write("6666666666"); // release os resources. } public producerconsumerqueue(int workercount) { try { _workercount = workercount; _workers = new thread[workercount]; // create , start separate thread each worker (int = 0; < workercount; i++) (_workers[i] = new thread(work)).start(); } catch (exception ex) { onlogged(ex.message + ex.stacktrace); } } #endregion #region iproducerconsumerqueue<t> members public void enqueuetask(t task) { if (_isactive) { _workerqueue.enqueue(task); //monitor.pulse(_locker); _mres.set(); } } public void work() { while (_isactive) { try { t item = dequeue(); if (item != null) ondequeued(item); } catch (exception ex) { onlogged(ex.message + ex.stacktrace); } } } #endregion private t dequeue() { try { t dequeueitem; //if (_workerqueue.count > 0) //{ _workerqueue.trydequeue(out dequeueitem); if (dequeueitem != null) return dequeueitem; //} if (_isactive) { _mres.wait(); _mres.reset(); } //_workerqueue.trydequeue(out dequeueitem); return dequeueitem; } catch (exception ex) { onlogged(ex.message + ex.stacktrace); t dequeueitem; //if (_workerqueue.count > 0) //{ _workerqueue.trydequeue(out dequeueitem); return dequeueitem; } } public void clear() { _workerqueue = new concurrentqueue<t>(); } }
}
when calling dispose blocks on join (one thread consuming) , dispose method stuck. guess get's stuck on wait of resetevents call set on dispose. suggestions?
update: understand point needing queue internally. suggestion use blockingcollection<t>
based on fact code contains lot of logic provide blocking behavior. writing such logic prone bugs (i know experience); when there's existing class within framework @ least some of work you, it's preferable go that.
a complete example of how can implement class using blockingcollection<t>
little bit large include in answer, i've posted working example on pastebin.com; feel free take , see think.
i wrote example program demonstrating above example here.
is code correct? wouldn't yes too confidence; after all, haven't written unit tests, run diagnostics on it, etc. it's basic draft give idea how using blockingcollection<t>
instead of concurrentqueue<t>
cleans lot of logic (in opinion) , makes easier focus on main purpose of class (consuming items queue , notifying subscribers) rather difficult aspect of implementation (the blocking behavior of internal queue).
question posed in comment:
any reason you're not using
blockingcollection<t>
?
your answer:
[...] needed queue.
from msdn documentation on default constructor blockingcollection<t>
class:
the default underlying collection
concurrentqueue<t>
.
if only reason opted implement own class instead of using blockingcollection<t>
need fifo queue, then... might want rethink decision. blockingcollection<t>
instantiated using default parameterless constructor is fifo queue.
that said, while don't think can offer comprehensive analysis of code you've posted, can @ least offer couple of pointers:
- i'd hesitant use events in way here class deals such tricky multithreaded behavior. calling code can attach event handlers wants, , these can in turn throw exceptions (which don't catch), block long periods of time, or possibly deadlock reasons outside control--which bad in case of blocking queue.
- there's race condition in
dequeue
,dispose
methods.
look @ these lines of dequeue
method:
if (_isactive) // point { _mres.wait(); // point c _mres.reset(); // point d }
and take @ these 2 lines dispose
:
_isactive = false; _mres.set(); // point b
let's have 3 threads, t1, t2, , t3. t1 , t2 both @ point a, each checks _isactive
, finds true
. dispose
called, , t3 sets _isactive
false
(but t1 , t2 have passed point a) , reaches point b, calls _mres.set()
. t1 gets point c, moves on point d, , calls _mres.reset()
. t2 reaches point c , stuck forever since _mres.set
not called again (any thread executing enqueue
find _isactive == false
, return immediately, , thread executing dispose
has passed point b).
i'd happy try , offer on solving race condition, i'm skeptical blockingcollection<t>
isn't in fact class need this. if can provide more information convince me isn't case, maybe i'll take look.
Comments
Post a Comment