1
0
Fork 0
mirror of https://github.com/eclipse-cdt/cdt synced 2025-08-04 06:45:43 +02:00

Bug 310335 - [concurrent] Query utility does not propagate a cancel requests

This commit is contained in:
Pawel Piech 2010-04-23 20:21:03 +00:00
parent ded34fbdbb
commit 8a353a1758
2 changed files with 229 additions and 170 deletions

View file

@ -15,9 +15,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.eclipse.cdt.dsf.internal.DsfPlugin;
import org.eclipse.core.runtime.CoreException; import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
/** /**
@ -54,18 +56,90 @@ import org.eclipse.core.runtime.CoreException;
abstract public class Query<V> extends DsfRunnable abstract public class Query<V> extends DsfRunnable
implements Future<V> implements Future<V>
{ {
/** The synchronization object for this query */ private class QueryRm extends DataRequestMonitor<V> {
private final Sync fSync = new Sync();
boolean fExecuted = false;
boolean fCompleted = false;
private QueryRm() {
super(ImmediateExecutor.getInstance(), null);
}
@Override
public synchronized void handleCompleted() {
fCompleted = true;
notifyAll();
}
public synchronized boolean isCompleted() {
return fCompleted;
}
public synchronized boolean setExecuted() {
if (fExecuted || isCanceled()) {
// already executed or canceled
return false;
}
fExecuted = true;
return true;
}
public synchronized boolean isExecuted() {
return fExecuted;
}
};
private final QueryRm fRm = new QueryRm();
/** /**
* The no-argument constructor * The no-argument constructor
*/ */
public Query() {} public Query() {}
public V get() throws InterruptedException, ExecutionException { return fSync.doGet(); } public V get() throws InterruptedException, ExecutionException {
IStatus status;
V data;
synchronized (fRm) {
while (!isDone()) {
fRm.wait();
}
status = fRm.getStatus();
data = fRm.getData();
}
if (status.getSeverity() == IStatus.CANCEL) {
throw new CancellationException();
} else if (status.getSeverity() != IStatus.OK) {
throw new ExecutionException(new CoreException(status));
}
return data;
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return fSync.doGet(unit.toNanos(timeout)); long timeLeft = unit.toMillis(timeout);
long timeoutTime = System.currentTimeMillis() + unit.toMillis(timeout);
IStatus status;
V data;
synchronized (fRm) {
while (!isDone()) {
if (timeLeft <= 0) {
throw new TimeoutException();
}
fRm.wait(timeLeft);
timeLeft = timeoutTime - System.currentTimeMillis();
}
status = fRm.getStatus();
data = fRm.getData();
}
if (status.getSeverity() == IStatus.CANCEL) {
throw new CancellationException();
} else if (status.getSeverity() != IStatus.OK) {
throw new ExecutionException(new CoreException(status));
}
return data;
} }
/** /**
@ -73,139 +147,44 @@ abstract public class Query<V> extends DsfRunnable
* if set. * if set.
*/ */
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
return fSync.doCancel(); boolean completed = false;
synchronized (fRm) {
completed = fRm.isCompleted();
if (!completed) {
fRm.cancel();
}
}
return !completed;
} }
public boolean isCancelled() { return fSync.doIsCancelled(); } public boolean isCancelled() { return fRm.isCanceled(); }
public boolean isDone() { return fSync.doIsDone(); } public boolean isDone() {
synchronized (fRm) {
return fRm.isCompleted() || (fRm.isCanceled() && !fRm.isExecuted());
protected void doneException(Throwable t) { }
fSync.doSetException(t);
} }
abstract protected void execute(DataRequestMonitor<V> rm); abstract protected void execute(DataRequestMonitor<V> rm);
public void run() { public void run() {
if (fSync.doRun()) { if (fRm.setExecuted()) {
try { execute(fRm);
/*
* Create the executor which is going to handle the completion of the
* request monitor. Normally a DSF executor is supplied here which
* causes the request monitor to be invoked in a new dispatch loop.
* But since the query is a synchronization object, it can handle
* the completion of the request in any thread.
* Avoiding the use of a DSF executor is very useful because queries are
* meant to be used by clients calling from non-dispatch thread, and there
* is a chance that a client may execute a query just as a session is being
* shut down. In that case, the DSF executor may throw a
* RejectedExecutionException which would have to be handled by the query.
*/
execute(new DataRequestMonitor<V>(ImmediateExecutor.getInstance(), null) {
@Override
public void handleCompleted() {
if (isSuccess()) fSync.doSet(getData());
else fSync.doSetException(new CoreException(getStatus()));
}
});
} catch(Throwable t) {
/*
* Catching the exception here will only work if the exception
* happens within the execute. It will not work in cases when
* the execute submits other runnables, and the other runnables
* encounter the exception.
*/
fSync.doSetException(t);
/*
* Since we caught the exception, it will not be logged by
* DefaultDsfExecutable.afterExecution(). So log it here.
*/
DefaultDsfExecutor.logException(t);
}
} }
} }
@SuppressWarnings("serial") /**
final class Sync extends AbstractQueuedSynchronizer { * Completes the query with the given exception.
private static final int STATE_RUNNING = 1; *
private static final int STATE_DONE = 2; * @deprecated Query implementations should call the request monitor to
private static final int STATE_CANCELLED = 4; * set the exception status directly.
*/
private V fResult; protected void doneException(Throwable t) {
private Throwable fException; fRm.setStatus(new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, "Exception", t)); //$NON-NLS-1$
fRm.done();
private boolean ranOrCancelled(int state) {
return (state & (STATE_DONE | STATE_CANCELLED)) != 0;
}
@Override
protected int tryAcquireShared(int ignore) {
return doIsDone()? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int ignore) {
return true;
}
boolean doIsCancelled() {
return getState() == STATE_CANCELLED;
}
boolean doIsDone() {
return ranOrCancelled(getState());
}
V doGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == STATE_CANCELLED) throw new CancellationException();
if (fException != null) throw new ExecutionException(fException);
return fResult;
}
V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
if (getState() == STATE_CANCELLED) throw new CancellationException();
if (fException != null) throw new ExecutionException(fException);
return fResult;
}
void doSet(V v) {
while(true) {
int s = getState();
if (ranOrCancelled(s)) return;
if (compareAndSetState(s, STATE_DONE)) break;
}
fResult = v;
releaseShared(0);
}
void doSetException(Throwable t) {
while(true) {
int s = getState();
if (ranOrCancelled(s)) return;
if (compareAndSetState(s, STATE_DONE)) break;
}
fException = t;
fResult = null;
releaseShared(0);
}
boolean doCancel() {
while(true) {
int s = getState();
if (ranOrCancelled(s)) return false;
if (compareAndSetState(s, STATE_CANCELLED)) break;
}
releaseShared(0);
return true;
}
boolean doRun() {
return compareAndSetState(0, STATE_RUNNING);
}
} }
} }

View file

@ -19,13 +19,14 @@ import junit.framework.Assert;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DsfRunnable; import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants;
import org.eclipse.cdt.dsf.concurrent.Query; import org.eclipse.cdt.dsf.concurrent.Query;
import org.eclipse.cdt.dsf.concurrent.RequestMonitor;
import org.eclipse.cdt.dsf.concurrent.RequestMonitor.ICanceledListener;
import org.eclipse.cdt.tests.dsf.DsfTestPlugin; import org.eclipse.cdt.tests.dsf.DsfTestPlugin;
import org.eclipse.cdt.tests.dsf.TestDsfExecutor; import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status; import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -75,6 +76,65 @@ public class DsfQueryTests {
} }
@Test
public void getErrorTest() throws InterruptedException, ExecutionException {
final String error_message = "Test Error";
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
rm.setStatus(new Status(IStatus.ERROR, DsfTestPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, error_message, null)); //$NON-NLS-1$
rm.done();
}
};
// Check initial state
Assert.assertTrue(!q.isDone());
Assert.assertTrue(!q.isCancelled());
fExecutor.execute(q);
try {
q.get();
Assert.fail("Expected exception");
} catch (ExecutionException e) {
Assert.assertEquals(e.getCause().getMessage(), error_message);
}
// Check final state
Assert.assertTrue(q.isDone());
Assert.assertTrue(!q.isCancelled());
}
@Test
public void doneExceptionTest() throws InterruptedException, ExecutionException {
Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
doneException(new Throwable());
}
};
// Check initial state
Assert.assertTrue(!q.isDone());
Assert.assertTrue(!q.isCancelled());
fExecutor.execute(q);
try {
q.get();
Assert.fail("Expected exception");
} catch (ExecutionException e) {
}
// Check final state
Assert.assertTrue(q.isDone());
Assert.assertTrue(!q.isCancelled());
}
@Test @Test
public void getWithMultipleDispatchesTest() throws InterruptedException, ExecutionException { public void getWithMultipleDispatchesTest() throws InterruptedException, ExecutionException {
Query<Integer> q = new Query<Integer>() { Query<Integer> q = new Query<Integer>() {
@ -116,44 +176,6 @@ public class DsfQueryTests {
} }
} }
@Test
public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException {
final Query<Integer> q = new Query<Integer>() {
@Override
protected void execute(final DataRequestMonitor<Integer> rm) {
// Call done with a delay of 1 second, to avoid stalling the tests.
fExecutor.schedule(
new DsfRunnable() {
public void run() { rm.done(); }
},
1, TimeUnit.SECONDS);
}
};
fExecutor.execute(q);
// Note: no point in checking isDone() and isCancelled() here, because
// the value could change on timing.
// This does not really guarantee that the cancel will be called after
// the call to Fugure.get(), but the 1ms delay in call to schedule should
// help.
new Job("DsfQueryTests cancel job") { @Override public IStatus run(IProgressMonitor monitor) { //$NON-NLS-1$
q.cancel(false);
return Status.OK_STATUS;
}}.schedule(1);
try {
q.get();
} catch (CancellationException e) {
return; // Success
} finally {
Assert.assertTrue(q.isDone());
Assert.assertTrue(q.isCancelled());
}
Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$
}
@Test @Test
public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException { public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException {
final Query<Integer> q = new Query<Integer>() { final Query<Integer> q = new Query<Integer>() {
@ -172,6 +194,8 @@ public class DsfQueryTests {
// Start the query. // Start the query.
fExecutor.execute(q); fExecutor.execute(q);
// Block to retrieve data // Block to retrieve data
try { try {
q.get(); q.get();
@ -184,6 +208,62 @@ public class DsfQueryTests {
Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$ Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$
} }
@Test
public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException {
final DataRequestMonitor<?>[] rmHolder = new DataRequestMonitor<?>[1];
final Boolean[] cancelCalled = new Boolean[] { Boolean.FALSE };
final Query<Integer> q = new Query<Integer>() {
@Override protected void execute(final DataRequestMonitor<Integer> rm) {
synchronized (rmHolder) {
rmHolder[0] = rm;
rmHolder.notifyAll();
}
}
};
// Start the query.
fExecutor.execute(q);
// Wait until the query is started
synchronized (rmHolder) {
while(rmHolder[0] == null) {
rmHolder.wait();
}
}
// Add a cancel listener to the query RM
rmHolder[0].addCancelListener(new ICanceledListener() {
public void requestCanceled(RequestMonitor rm) {
cancelCalled[0] = Boolean.TRUE;
}
});
// Cancel running request.
q.cancel(false);
Assert.assertTrue(cancelCalled[0]);
Assert.assertTrue(rmHolder[0].isCanceled());
Assert.assertTrue(q.isCancelled());
Assert.assertFalse(q.isDone());
// Complete rm and query.
rmHolder[0].done();
// Retrieve data
try {
q.get();
} catch (CancellationException e) {
return; // Success
} finally {
Assert.assertTrue(q.isDone());
Assert.assertTrue(q.isCancelled());
}
Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$
}
@Test @Test
public void getTimeoutTest() throws InterruptedException, ExecutionException { public void getTimeoutTest() throws InterruptedException, ExecutionException {
final Query<Integer> q = new Query<Integer>() { final Query<Integer> q = new Query<Integer>() {
@ -194,7 +274,7 @@ public class DsfQueryTests {
new DsfRunnable() { new DsfRunnable() {
public void run() { rm.done(); } public void run() { rm.done(); }
}, },
1, TimeUnit.SECONDS); 60, TimeUnit.SECONDS);
} }
}; };