|
Quick Lists
|
|
Bug ID:
|
6663476
|
|
Votes
|
0
|
|
Synopsis
|
FutureTask.get() may return null if set() is not called from run()
|
|
Category
|
java:classes_util_concurrent
|
|
Reported Against
|
|
|
Release Fixed
|
|
|
State
|
5-Cause Known,
bug
|
|
Priority:
|
4-Low
|
|
Related Bugs
|
6665818
|
|
Submit Date
|
14-FEB-2008
|
|
Description
|
SYNOPSIS:
The FutureTask.Sync.runner variable is not managed in the same way that it is for FutureTask.run().
OPERATING SYSTEM(S):
Windows
FULL JDK VERSION(S):
Sun Java 1.5.0_14 and Sun Java 6 update4
DESCRIPTION:
Problem is observed only on a multiprocessor machine and not on single processor machine.Had run the program across JDKs and had observed the issue.
If needed,outputs of programs run can be provided.
Sample Program:
//package testFutureTask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Demonstrates that {@link FutureTask#set} doesn't safely publish the
* value. testSet failed on my Intellistation z-Pro (dual processor, each
* hyperthreaded) with the following message:
*
* Failure counts by method (of 1642125 attempts):
* get, with infinite wait: 8189
* get, with timeout: 6268
* get, with zero wait: 4194
*
* @author James Synge
*/
public class TestFutureTaskSubClass {
public static void main(String[] args) throws Exception {
TestFutureTaskSubClass test = new TestFutureTaskSubClass();
test.test(true);
test = new TestFutureTaskSubClass();
test.test(false);
}
protected static final int TOTAL_METHODS = 3;
private Thread[] threads;
private FutureTask<int[]>[] tasks;
protected volatile Integer mt_expectedV = null;
protected volatile AFutureValue mt_fv = null;
protected volatile CountDownLatch mt_doneGetting = null;
protected volatile boolean mt_failed = false;
protected volatile boolean mt_doStop = false;
/**
* Test method for {@link java.util.concurrent.FutureTask#set(java.lang.Object)}.
* @throws Exception
*/
public void test(final boolean useSet) throws Exception {
System.out.println("--------------------------------------------");
if (useSet) {
System.out.println("Testing FutureValue.set");
}
else {
System.out.println("Testing FutureValue.run");
}
/*
* Create background threads that will get the value
* of a FutureTask provided by this foreground thread.
*/
createTasksAndThreads();
System.out.println("Started " + tasks.length + " threads to read from the FutureValue");
/*
* Run for some amount of time.
*/
final int DURATION = 10 * 1000; // 10 seconds
System.out.println("Starting main loop...");
final long startTime = System.currentTimeMillis();
final long endTime = startTime + DURATION;
int counter = 0;
mt_failed = false;
while (!mt_failed) {
long now = System.currentTimeMillis();
if (now >= endTime) {
System.out.println("Main loop duration reached");
break;
}
/*
* Objects that the threads will need after they
* get.
*/
mt_expectedV = new Integer((int)(now & 0xffffff));
mt_doneGetting = new CountDownLatch(threads.length);
/*
* The FutureValue that the threads will try to get
* the expected value from.
*/
if (useSet) {
mt_fv = new AFutureValue();
}
else {
mt_fv = new AFutureValue(new FixedResult<Integer>(mt_expectedV));
}
/*
* Yield the process before setting the value... sometimes.
*/
if (counter % 10 == 0) {
Thread.yield();
}
/*
* Set the value.
*/
if (useSet) {
assertTrue(mt_fv.mySet(mt_expectedV));
}
else {
mt_fv.run();
assertTrue(mt_fv.isDone());
assertTrue(!mt_fv.isCancelled());
}
assertSame(mt_expectedV, mt_fv.get());
/*
* Wait for the threads to finish getting the value.
*/
if (!mt_doneGetting.await(DURATION, TimeUnit.MILLISECONDS)) {
throw new AssertionError("waited too long for reader threads to read");
}
counter++;
if (counter % 100000 == 0) {
System.out.println("Completed " + counter + " loops");
}
}
mt_doStop = true;
mt_fv = null;
/*
* Wait for all threads to finish.
*/
System.out.println("Waiting for reader threads to finish");
for (Thread thread : threads) {
thread.join();
}
/*
* Did any of the threads fail?
*/
boolean didFail = false;
int[] sumFailuresByMethod = new int[TOTAL_METHODS];
for (FutureTask<int[]> task : tasks) {
int[] failuresByMethod = task.get();
for (int method = 0; method < TOTAL_METHODS; method++) {
sumFailuresByMethod[method] += failuresByMethod[method];
if (failuresByMethod[method] != 0) {
didFail = true;
}
}
}
if (!didFail) {
System.out.println("No problems encountered");
return;
}
int attempts = threads.length * counter;
String msg = String.format(
"Failure counts by method (of %d attempts):\n" +
" get, with infinite wait:\t%d\n" +
" get, with timeout:\t%d\n" +
" get, with zero wait:\t%d",
attempts,
sumFailuresByMethod[0],
sumFailuresByMethod[1],
sumFailuresByMethod[2]);
System.err.println(msg);
return;
}
private void assertSame(Integer expected, Integer actual) {
if (expected == actual) {
return;
}
String msg = String.format(
"Expected instances to be the same, but they aren't\n" +
" expected identityHashCode: %d\n" +
" actual identityHashCode: %d\n" +
" expected value: %s\n" +
" actual value: %s",
System.identityHashCode(expected),
System.identityHashCode(actual),
expected, actual);
throw new AssertionError(msg);
}
private void assertTrue(boolean v) {
if (v) {
return;
}
throw new AssertionError("Expected true, as false");
}
/**
* Need at least one background thread, and ideally want one thread
* (including the foreground testing thread) on each processor.
*/
private void createTasksAndThreads() {
int numProcessors = Runtime.getRuntime().availableProcessors();
final int numThreads = (numProcessors > 1) ? (numProcessors - 1) : numProcessors;
threads = new Thread[numThreads];
tasks = new FutureTask[numThreads];
for (int i = 0; i < threads.length; i++) {
BackgroundReader bgReader = new BackgroundReader();
FutureTask<int[]> task = new FutureTask<int[]>(bgReader);
tasks[i] = task;
Thread thread = new Thread(task);
thread.start();
threads[i] = thread;
}
}
/**
* Define the block that will be executed by the background threads.
*/
class BackgroundReader implements Callable<int[]> {
public int[] call() throws Exception {
int gets = 0;
FutureTask<Integer> prevFV = null;
FutureTask<Integer> fv = null;
int method = new Random().nextInt(3);
int[] failuresByMethod = new int[3];
int spinLoops = 0;
while (!mt_doStop) {
/*
* Spin loop to get the next FutureValue.
*/
fv = mt_fv;
if (fv == prevFV) {
spinLoops++;
if (spinLoops % 10000000 == 0) {
System.out.println(
"Long FV update loop in thread " +
Thread.currentThread().getName() +
"; count = " +
spinLoops +
"; gets = " +
gets);
}
continue;
}
if (fv == null) {
break;
}
prevFV = fv;
spinLoops = 0;
gets++;
/*
* Try several methods to get the value.
*/
Integer value = null;
if (method == 0) {
// Wait until value is set.
value = fv.get();
}
else if (method == 1) {
// Wait a reasonable amount of time.
value = fv.get(100, TimeUnit.MILLISECONDS);
}
else if (method == 2) {
// Poll for the value (i.e. don't wait, but instead spin).
while (true) {
try {
value = fv.get(0, TimeUnit.SECONDS);
break;
}
catch (TimeoutException ex) {
continue;
}
}
}
Integer expectedV = mt_expectedV;
try {
if (value == null) {
failuresByMethod[method]++;
// mt_failed = true;
// throw new AssertionError("Method #" + method + " failed");
}
else if (expectedV != value) {
mt_failed = true;
throw new AssertionError(
"Gets #" + gets +
", Method #" + method +
": wrong instance returned; expected customer " +
System.identityHashCode(expectedV) +
", but got " +
System.identityHashCode(value));
}
}
finally {
mt_doneGetting.countDown();
}
method++;
if (method > 2) {
method = 0;
}
}
return failuresByMethod;
}
}
/**
* A sub-class of {@link FutureTask} that attempts to use FutureTask.set.
*/
class AFutureValue extends FutureTask<Integer> {
private volatile Thread creatorThread = Thread.currentThread();
public AFutureValue() {
super(new NeverCalled<Integer>());
return;
}
public AFutureValue(FixedResult<Integer> callable) {
super(callable);
return;
}
public boolean mySet(Integer value) {
if (isDone()) {
return false;
}
if (creatorThread != Thread.currentThread()) {
throw new IllegalStateException("Must only call from creator's thread");
}
try {
super.set(value);
if (isCancelled()) {
return false;
}
else {
return true;
}
}
finally {
creatorThread = null;
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return super.cancel(mayInterruptIfRunning);
}
@Override
public Integer get() throws InterruptedException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get();
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
@Override
public Integer get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get(timeout, unit);
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
}
class NeverCalled<V> implements Callable<V> {
public V call() throws Exception {
throw new UnsupportedOperationException();
}
}
class FixedResult<V> implements Callable<V> {
private final V result;
FixedResult(V result) {
this.result = result;
return;
}
public V call() throws Exception {
return result;
}
}
}
Posted Date : 2008-02-14 22:37:43.0
|
|
Work Around
|
NONE
Do not call set or setException directly.
Use them only as callbacks invoked by run(),
as intended by the designers of this API.
It seems possible to wrap tasks with an object
that provides set and setException methods,
so that when run() is subsequently executed,
it returns immediately in accordance with
the values provided by set and/or setException.
|
|
Evaluation
|
The submitter is correct that there is a serious
concurrency bug here. In this fragment from
FutureTask.java:innerSet,
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
if another thread calls get after the compareAndSet succeeds,
but before result is assigned, get will return null instead of the correct value.
Posted Date : 2008-02-16 23:40:21.0
Here is a somewhat reduced version of the submitter's test case demonstrating the
problem on all versions of the JDK:
import java.util.*;
import java.util.concurrent.*;
public class Bug2 {
public static void main(String[] args) throws Throwable {
new Bug2().test();
}
static final int TOTAL_METHODS = 3;
final int numProcessors = Runtime.getRuntime().availableProcessors();
final int numThreads = Math.max(1, numProcessors - 1);
final Thread[] threads = new Thread[numThreads];
final List<FutureTask<int[]>> tasks =
new ArrayList<FutureTask<int[]>>();
volatile Integer mt_expectedV = null;
volatile MyFutureTask mt_fv = null;
volatile CountDownLatch doneGetting = null;
volatile boolean failed = false;
volatile boolean doStop = false;
Bug2() {
for (int i = 0; i < threads.length; i++) {
BackgroundReader bgReader = new BackgroundReader();
FutureTask<int[]> task = new FutureTask<int[]>(bgReader);
tasks.add(task);
Thread thread = new Thread(task);
thread.start();
threads[i] = thread;
}
}
/**
* Test method for {@link java.util.concurrent.FutureTask#set(java.lang.Object)}.
*/
void test() throws Throwable {
// Create background threads that will get the value
// of a FutureTask provided by this foreground thread.
System.out.println("Started " + tasks.size() + " threads to read from the FutureTask");
// Run for some amount of time.
final long DURATION = 10 * 1000; // 10 seconds
final long startTime = System.currentTimeMillis();
final long endTime = startTime + DURATION;
long now;
int counter = 0;
failed = false;
while (!failed && (now = System.currentTimeMillis()) < endTime) {
// Objects that the threads will need after they get.
mt_expectedV = (int)(now & 0xffffff);
doneGetting = new CountDownLatch(threads.length);
// The FutureValue that the threads will try to get the
// expected value from.
mt_fv = new MyFutureTask(
new Callable<Integer>() { public Integer call() {
return mt_expectedV; }});
// Yield the process before setting the value... sometimes.
if (counter % 10 == 0)
Thread.yield();
// Set the value.
mt_fv.set(mt_expectedV);
assertTrue(mt_fv.isDone());
assertTrue(!mt_fv.isCancelled());
assertSame(mt_expectedV, mt_fv.get());
// Wait for the threads to finish getting the value.
if (!doneGetting.await(DURATION, TimeUnit.MILLISECONDS))
throw new AssertionError("waited too long for reader threads to read");
counter++;
if (counter % 100000 == 0)
System.out.println("Completed " + counter + " loops");
}
doStop = true;
mt_fv = null;
// Wait for all threads to finish.
System.out.println("Waiting for reader threads to finish");
for (Thread thread : threads)
thread.join();
// Did any of the threads fail?
int[] sumFailuresByMethod = new int[TOTAL_METHODS];
for (FutureTask<int[]> task : tasks) {
int[] failuresByMethod = task.get();
if (failuresByMethod == null) throw new Error();
for (int method = 0; method < TOTAL_METHODS; method++)
sumFailuresByMethod[method] += failuresByMethod[method];
}
if (! Arrays.equals(sumFailuresByMethod, new int[] { 0, 0, 0}))
System.err.printf(
"Failure counts by method (of %d attempts):%n" +
" get, with infinite wait:\t%d%n" +
" get, with timeout:\t%d%n" +
" get, with zero wait:\t%d%n",
threads.length * counter,
sumFailuresByMethod[0],
sumFailuresByMethod[1],
sumFailuresByMethod[2]);
}
private void assertSame(Integer expected, Integer actual) {
if (expected != actual)
throw new AssertionError(
String.format(
"Expected instances to be the same, but they aren't%n" +
" expected identityHashCode: %d%n" +
" actual identityHashCode: %d%n" +
" expected value: %s%n" +
" actual value: %s",
System.identityHashCode(expected),
System.identityHashCode(actual),
expected, actual));
}
private void assertTrue(boolean v) {
if (! v)
throw new AssertionError();
}
/**
* Define the block that will be executed by the background threads.
*/
class BackgroundReader implements Callable<int[]> {
public int[] call() throws Exception {
int gets = 0;
FutureTask<Integer> prevFV = null;
FutureTask<Integer> fv = null;
int method = new Random().nextInt(TOTAL_METHODS);
int[] failuresByMethod = new int[TOTAL_METHODS];
int spinLoops = 0;
while (!doStop) {
// Spin loop to get the next FutureValue.
fv = mt_fv;
if (fv == prevFV) {
spinLoops++;
if (spinLoops % 10000000 == 0)
System.out.printf(
"Long FV update loop in thread %s; count = %d; gets = %d%n",
Thread.currentThread().getName(), spinLoops, gets);
continue;
}
if (fv == null)
break;
prevFV = fv;
spinLoops = 0;
gets++;
// Try several methods to get the value.
Integer value = null;
if (method == 0) {
// Wait until value is set.
value = fv.get();
} else if (method == 1) {
// Wait a reasonable amount of time.
value = fv.get(100, TimeUnit.MILLISECONDS);
} else if (method == 2) {
// Poll for the value (i.e. don't wait, but instead spin).
while (true) {
try {
value = fv.get(0, TimeUnit.SECONDS);
break;
}
catch (TimeoutException ex) {}
}
}
Integer expectedV = mt_expectedV;
try {
if (value == null) {
failuresByMethod[method]++;
}
else if (expectedV != value) {
failed = true;
throw new AssertionError(
"Gets #" + gets +
", Method #" + method +
": wrong instance returned; expected object " +
System.identityHashCode(expectedV) +
", but got " +
System.identityHashCode(value));
}
}
finally {
doneGetting.countDown();
}
method = (method + 1) % TOTAL_METHODS;
}
return failuresByMethod;
}
}
/**
* A subclass of {@link FutureTask} that exposes set and setException.
*/
static class MyFutureTask extends FutureTask<Integer> {
public MyFutureTask(Callable<Integer> callable) {
super(callable);
}
public void set(Integer value) {
super.set(value);
}
public void setException(Throwable t) {
super.setException(t);
}
}
}
Posted Date : 2008-02-16 23:40:21.0
The base issue here is that run() was not used to evaluate the Future and to set its result, yet that is the sole purpose of run(). The specification for set states():
* This method is invoked internally by the <tt>run</tt> method
* upon successful completion of the computation.
I would argue that you are not supposed to call set() directly. The only reason it is protected is to allow overriding, not to allow direct invocation. It is the role of run() to evaluate a FutureTask, not set().
Posted Date : 2008-02-17 10:14:31.0
Doug Lea writes:
------------------------------
Exactly so. [That set() should only be called from run()] is the intent,
and I think it is spelled out OK in the javadocs.
So, not a bug, modulo perhaps some further doc clarification.
But maybe an RFE -- we might consider loosening this restriction.
------------------------------
My impression (and apparently that of the submitter's)
was that set() and setException() could be called independently,
and that it was reasonable for a subclass to expose these methods as public.
The phrase "This method is invoked internally by the run method",
in my view, does not preclude independent invocation.
These methods can be seen a kind of cancellation,
allowing client code to announce that the computation is unnecessary,
since a final result that would have been computed is already available.
Posted Date : 2008-02-17 21:25:06.0
The intended usage of FutureTask is covered by a number of statements in the javadoc, but, as is always the case with documentation, it can be made clearer:
classdoc: "This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. "
- No mention of methods to set a result directly
classdoc: "The result can only be retrieved when the computation has completed;"
- Associates presence of a result with the act of executing the computation
Constructor: "Creates a FutureTask that will upon running, execute the given Runnable, and arrange that get will return the given result on successful completion."
- Again associates existence of result with execution of the Runnable.
get: "Waits if necessary for the computation to complete, and then retrieves its result."
- Again associates existence of result with execution of computation
set/setException: "This method is invoked internally by the run method upon successful completion / failure of the computation."
- Specifies that these methods are for internal usage ( just like done() is a hook for internal usage - you would not export done() as a public method )
run: "Sets this Future to the result of its computation unless it has been cancelled."
- Clearly states that run() is responsible for performing the computation and setting the result
Posted Date : 2008-02-17 23:48:30.0
Doug Lea writes:
----------------------------------------------------------------------
Allowing async (wrt running thread) set() calls changes the underlying
sync logic from dealing only with races of runner vs cancellers to
those also involving one or more other setter threads.
If we wanted to support this from the beginning,
we'd have used a slower but more general lock-based scheme.
And if we decide to do it, this would probably be
the way to go. But do we want to slow down FutureTask to support
usages that don't seem correct to begin with? This is after all
the FutureTask class (emphasis on Task!) not the SettableFuture class.
Maybe we need something along those lines.
----------------------------------------------------------------------
Posted Date : 2008-02-19 02:38:31.0
|
|
Comments
|
PLEASE NOTE: JDK6 is formerly known as Project Mustang
|
|
|
 |