Java Solaris Communities Sun Store Join SDN My Profile Why Join?
 
Bug Database
Bug Detail
Quick Lists
Top 25 Bugs
Top 25 RFE's
Recently Closed Bugs
Printable Page Printable Page


Bug Database
Bug ID: 6609775
Votes 0
Synopsis Reduce context switches in DelayQueue due to signalAll
Category java:classes_util_concurrent
Reported Against
Release Fixed 7(b25)
State 10-Fix Delivered, bug
Priority: 3-Medium
Related Bugs
Submit Date 26-SEP-2007
Description
DelayQueue makes extensive use of signalAll, which may cause excessive
context switches when many threads are waiting for the next element.
All threads in take() or timed poll will wake when a delay expires
or the head of the queue is changed, although only one will succeed in
accomplishing anything.

Here's a stress test:


import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;

public class StressDelayQueue {

     public static void main(String[] args) throws Throwable {

	final DelayQueue<Delayed> q = new DelayQueue<Delayed>();
	final long t0 = System.nanoTime();
	for (long i = 0; i < 1000; i++) {
	    final long expiry = t0 + i*10L*1000L*1000L;
	    q.add(new Delayed() {
		    public long getDelay(TimeUnit unit) {
			return unit.convert(expiry - System.nanoTime(),
					    NANOSECONDS);
		    }
		    public int compareTo(Delayed x) {
			long d = getDelay(NANOSECONDS)
			    - x.getDelay(NANOSECONDS);
			return d < 0 ? -1 : d > 0 ? 1 : 0; }});
	}

	for (int i = 0; i < 300; i++)
	    new Thread() { public void run() {
		try {
		    while (!q.isEmpty())
			q.poll(10L, TimeUnit.SECONDS);
		} catch (Throwable t) { t.printStackTrace(); }
	    }}.start();
    }
}
Posted Date : 2007-09-26 16:20:23.0
Work Around
N/A
Evaluation
Yes.

--- /u/martin/ws/dolphin/src/share/classes/java/util/concurrent/DelayQueue.java	2007-05-03 10:46:07.250884000 -0700
+++ /u/martin/ws/concurrent/src/share/classes/java/util/concurrent/DelayQueue.java	2007-09-26 00:36:36.240464000 -0700
@@ -69,10 +69,34 @@
     implements BlockingQueue<E> {
 
     private transient final ReentrantLock lock = new ReentrantLock();
-    private transient final Condition available = lock.newCondition();
     private final PriorityQueue<E> q = new PriorityQueue<E>();
 
     /**
+     * Thread designated to wait for the element at the head of
+     * the queue.  This variant of the Leader-Follower pattern
+     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+     * minimize unnecessary timed waiting.  When a thread becomes
+     * the leader, it waits only for the next delay to elapse, but
+     * other threads await indefinitely.  The leader thread must
+     * signal some other thread before returning from take() or
+     * poll(...), unless some other thread becomes leader in the
+     * interim.  Whenever the head of the queue is replaced with
+     * an element with an earlier expiration time, the leader
+     * field is invalidated by being reset to null, and some
+     * waiting thread, but not necessarily the current leader, is
+     * signalled.  So waiting threads must be prepared to acquire
+     * and lose leadership while waiting.
+     */
+    private Thread leader = null;
+
+    /**
+     * Condition signalled when a newer element becomes available
+     * at the head of the queue or a new thread may need to
+     * become leader.
+     */
+    private final Condition available = lock.newCondition();
+
+    /**
      * Creates a new <tt>DelayQueue</tt> that is initially empty.
      */
     public DelayQueue() {}
@@ -111,10 +135,11 @@
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            E first = q.peek();
             q.offer(e);
-            if (first == null || e.compareTo(first) < 0)
-                available.signalAll();
+            if (q.peek() == e) {
+		leader = null;
+                available.signal();
+	    }
             return true;
         } finally {
             lock.unlock();
@@ -160,13 +185,8 @@
             E first = q.peek();
             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                 return null;
-            else {
-                E x = q.poll();
-                assert x != null;
-                if (q.size() != 0)
-                    available.signalAll();
-                return x;
-            }
+            else
+                return q.poll();
         } finally {
             lock.unlock();
         }
@@ -185,23 +205,29 @@
         try {
             for (;;) {
                 E first = q.peek();
-                if (first == null) {
+                if (first == null)
                     available.await();
-                } else {
+		else {
                     long delay =  first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        long tl = available.awaitNanos(delay);
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll(); // wake up other takers
-                        return x;
-
+		    if (delay <= 0)
+			return q.poll();
+		    else if (leader != null)
+			available.await();
+		    else {
+			Thread thisThread = Thread.currentThread();
+			leader = thisThread;
+			try {
+			    available.awaitNanos(delay);
+			} finally {
+			    if (leader == thisThread)
+				leader = null;
+			}
                     }
                 }
             }
         } finally {
+	    if (leader == null && q.peek() != null)
+		available.signal();
             lock.unlock();
         }
     }
@@ -230,23 +256,28 @@
                         nanos = available.awaitNanos(nanos);
                 } else {
                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
+		    if (delay <= 0)
+			return q.poll();
                         if (nanos <= 0)
                             return null;
-                        if (delay > nanos)
-                            delay = nanos;
+		    if (nanos < delay || leader != null)
+			nanos = available.awaitNanos(nanos);
+		    else {
+			Thread thisThread = Thread.currentThread();
+			leader = thisThread;
+			try {
                         long timeLeft = available.awaitNanos(delay);
                         nanos -= delay - timeLeft;
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll();
-                        return x;
+			} finally {
+			    if (leader == thisThread)
+				leader = null;
+			}
                     }
                 }
             }
         } finally {
+	    if (leader == null && q.peek() != null)
+		available.signal();
             lock.unlock();
         }
     }
@@ -303,8 +334,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -335,8 +364,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -485,6 +512,7 @@
             return cursor < array.length;
         }
 
+	@SuppressWarnings("unchecked")
         public E next() {
             if (cursor >= array.length)
                 throw new NoSuchElementException();
Posted Date : 2007-09-26 16:20:23.0

--- /u/martin/ws/dolphin/src/share/classes/java/util/concurrent/DelayQueue.java	2007-05-03 10:46:07.250884000 -0700
+++ /u/martin/ws/concurrent/src/share/classes/java/util/concurrent/DelayQueue.java	2007-09-26 00:36:36.240464000 -0700
@@ -69,10 +69,34 @@
     implements BlockingQueue<E> {
 
     private transient final ReentrantLock lock = new ReentrantLock();
-    private transient final Condition available = lock.newCondition();
     private final PriorityQueue<E> q = new PriorityQueue<E>();
 
     /**
+     * Thread designated to wait for the element at the head of
+     * the queue.  This variant of the Leader-Follower pattern
+     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
+     * minimize unnecessary timed waiting.  When a thread becomes
+     * the leader, it waits only for the next delay to elapse, but
+     * other threads await indefinitely.  The leader thread must
+     * signal some other thread before returning from take() or
+     * poll(...), unless some other thread becomes leader in the
+     * interim.  Whenever the head of the queue is replaced with
+     * an element with an earlier expiration time, the leader
+     * field is invalidated by being reset to null, and some
+     * waiting thread, but not necessarily the current leader, is
+     * signalled.  So waiting threads must be prepared to acquire
+     * and lose leadership while waiting.
+     */
+    private Thread leader = null;
+
+    /**
+     * Condition signalled when a newer element becomes available
+     * at the head of the queue or a new thread may need to
+     * become leader.
+     */
+    private final Condition available = lock.newCondition();
+
+    /**
      * Creates a new <tt>DelayQueue</tt> that is initially empty.
      */
     public DelayQueue() {}
@@ -111,10 +135,11 @@
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            E first = q.peek();
             q.offer(e);
-            if (first == null || e.compareTo(first) < 0)
-                available.signalAll();
+            if (q.peek() == e) {
+		leader = null;
+                available.signal();
+	    }
             return true;
         } finally {
             lock.unlock();
@@ -160,13 +185,8 @@
             E first = q.peek();
             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                 return null;
-            else {
-                E x = q.poll();
-                assert x != null;
-                if (q.size() != 0)
-                    available.signalAll();
-                return x;
-            }
+            else
+                return q.poll();
         } finally {
             lock.unlock();
         }
@@ -185,23 +205,29 @@
         try {
             for (;;) {
                 E first = q.peek();
-                if (first == null) {
+                if (first == null)
                     available.await();
-                } else {
+		else {
                     long delay =  first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
-                        long tl = available.awaitNanos(delay);
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll(); // wake up other takers
-                        return x;
-
+		    if (delay <= 0)
+			return q.poll();
+		    else if (leader != null)
+			available.await();
+		    else {
+			Thread thisThread = Thread.currentThread();
+			leader = thisThread;
+			try {
+			    available.awaitNanos(delay);
+			} finally {
+			    if (leader == thisThread)
+				leader = null;
+			}
                     }
                 }
             }
         } finally {
+	    if (leader == null && q.peek() != null)
+		available.signal();
             lock.unlock();
         }
     }
@@ -230,23 +256,28 @@
                         nanos = available.awaitNanos(nanos);
                 } else {
                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
-                    if (delay > 0) {
+		    if (delay <= 0)
+			return q.poll();
                         if (nanos <= 0)
                             return null;
-                        if (delay > nanos)
-                            delay = nanos;
+		    if (nanos < delay || leader != null)
+			nanos = available.awaitNanos(nanos);
+		    else {
+			Thread thisThread = Thread.currentThread();
+			leader = thisThread;
+			try {
                         long timeLeft = available.awaitNanos(delay);
                         nanos -= delay - timeLeft;
-                    } else {
-                        E x = q.poll();
-                        assert x != null;
-                        if (q.size() != 0)
-                            available.signalAll();
-                        return x;
+			} finally {
+			    if (leader == thisThread)
+				leader = null;
+			}
                     }
                 }
             }
         } finally {
+	    if (leader == null && q.peek() != null)
+		available.signal();
             lock.unlock();
         }
     }
@@ -303,8 +334,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -335,8 +364,6 @@
                 c.add(q.poll());
                 ++n;
             }
-            if (n > 0)
-                available.signalAll();
             return n;
         } finally {
             lock.unlock();
@@ -485,6 +512,7 @@
             return cursor < array.length;
         }
 
+	@SuppressWarnings("unchecked")
         public E next() {
             if (cursor >= array.length)
                 throw new NoSuchElementException();
Posted Date : 2007-09-26 16:20:23.0

$ for v in dolphin concurrent; do echo $v; time jver $v jr StressDelayQueue.java; done
dolphin
==> javac -Xlint:all StressDelayQueue.java
==> java -esa -ea StressDelayQueue
jver $v jr StressDelayQueue.java  7.85s user 7.38s system 65% cpu 23.423 total
concurrent
==> javac -Xlint:all StressDelayQueue.java
==> java -esa -ea StressDelayQueue
jver $v jr StressDelayQueue.java  2.12s user 0.69s system 16% cpu 16.833 total
Posted Date : 2007-09-26 16:20:23.0
Comments
  
  Include a link with my name & email   


PLEASE NOTE: JDK6 is formerly known as Project Mustang