-
-
Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
| public class BlockingQueue implements Queue { | |
| private java.util.Queue queue = new java.util.LinkedList(); | |
| /** | |
| * Make a blocking Dequeue call so that we'll only return when the queue has | |
| * something on it, otherwise we'll wait until something is put on it. | |
| * | |
| * @returns This will return null if the thread wait() call is interrupted. | |
| */ | |
| public synchronized Object dequeue() { | |
| Object msg = null; | |
| while (queue.isEmpty()) { | |
| try { | |
| wait(); | |
| } catch (InterruptedException e) { | |
| // Error return the client a null item | |
| return msg; | |
| } | |
| } | |
| msg = queue.remove(); | |
| return msg; | |
| } | |
| /** | |
| * Enqueue will add an object to this queue, and will notify any waiting | |
| * threads that there is an object available. | |
| */ | |
| public synchronized void enqueue(Object o) { | |
| queue.add(o); | |
| // Wake up anyone waiting for something to be put on the queue. | |
| notifyAll(); | |
| } | |
| } |
| public class Consumer implements Runnable { | |
| // This will be assigned in the constructor | |
| private Queue queue = null; | |
| public void process(Object msg) { | |
| try { | |
| //process message non-trivially (IE: it takes awhile). | |
| Thread.sleep(2000); | |
| } catch (InterruptedException e) { | |
| // TODO Auto-generated catch block | |
| e.printStackTrace(); | |
| } | |
| } | |
| public void run() { | |
| while(true) { | |
| doStuff(); | |
| } | |
| } | |
| public void doStuff() { | |
| Object msg = queue.dequeue(); | |
| process(msg); | |
| } | |
| } |
| public class Producer implements Runnable { | |
| // This will be assigned in the constructor | |
| private Queue queue = null; | |
| public void run() { | |
| // Binds to socket, reads messages in | |
| // packages message calls doSomething() | |
| // doSomething(Object msg); | |
| } | |
| public void doSomething(Object msg) { | |
| queue.enqueue(msg); | |
| } | |
| } |
@jeremyshi is correct, you need a limit attribute in the class of BlockingQueue
does while loop is required?? having just if condition should serve the purpose rite??
@vikranthpatoju is right. In enqueue method it is enough to do notify instead of notifyAll. The while(isEmpty) is not safe, since if there are 2 consumers simultaneously read the same element in the queue, you will have a race condition.
public class BoundedList<T> {
private final int capcity;
private final Queue<T> list;
private final Object lock = new Object();
public BoundedList(int capcity) {
this.capcity = capcity;
this.list = new ArrayDeque<>(capcity);
}
T poll() throws InterruptedException {
synchronized (lock) {
while (list.isEmpty()) {
lock.wait();
}
T e = list.poll();
lock.notify();
return e;
}
}
void add(T e) throws InterruptedException {
synchronized (lock) {
while (list.size() == capcity) {
lock.wait();
}
list.add(e);
lock.notify();
}
}
}
will this work
@shankyty No, unfortunately that won't work, because it can result in the lost wakeup problem. Check out this stack overflow article: https://stackoverflow.com/a/3186336/7602403
One solution is to replace your calls to lock.notify() with lock.notifyAll()
@chrislzm its depends on the situation notifyAll will cause the thread contentation eg if a lot of threads read or get the data from the queue and few of threads writing the data to the queue
I think you also need to consider the case that queue is full. In the real world, queue cannot has unlimited space.