1
0
Fork 0
mirror of https://github.com/eclipse-cdt/cdt synced 2025-08-07 08:15:48 +02:00

Refactored DsfSequence class (bug# 159048).

Also added tracing of executables that were garbage-disposed without ever being run.
This commit is contained in:
Pawel Piech 2006-10-13 22:34:59 +00:00
parent 6ed5cc8d0e
commit 3f86dad9d5
8 changed files with 619 additions and 300 deletions

View file

@ -77,5 +77,23 @@ public class DsfPlugin extends Plugin {
} }
} }
public static String getDebugTime() {
StringBuilder traceBuilder = new StringBuilder();
// Record the time
long time = System.currentTimeMillis();
long seconds = (time / 1000) % 1000;
if (seconds < 100) traceBuilder.append('0');
if (seconds < 10) traceBuilder.append('0');
traceBuilder.append(seconds);
traceBuilder.append(',');
long millis = time % 1000;
if (millis < 100) traceBuilder.append('0');
if (millis < 10) traceBuilder.append('0');
traceBuilder.append(millis);
return traceBuilder.toString();
}
} }

View file

@ -30,7 +30,7 @@ import java.lang.annotation.Target;
* It should be null if it cannot be determined from the given object. * It should be null if it cannot be determined from the given object.
*/ */
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.METHOD, ElementType.FIELD}) @Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
@Inherited @Inherited
@Documented @Documented
public @interface ConfinedToDsfExecutor { public @interface ConfinedToDsfExecutor {

View file

@ -92,7 +92,6 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
} }
} }
// //
// Utilities used for tracing. // Utilities used for tracing.
// //
@ -132,7 +131,7 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
int fSequenceNumber = -1; int fSequenceNumber = -1;
/** Trace of where the runnable/callable was submitted to the executor */ /** Trace of where the runnable/callable was submitted to the executor */
StackTraceElement[] fSubmittedAt = null; StackTraceWrapper fSubmittedAt = null;
/** Reference to the runnable/callable that submitted this runnable/callable to the executor */ /** Reference to the runnable/callable that submitted this runnable/callable to the executor */
TracingWrapper fSubmittedBy = null; TracingWrapper fSubmittedBy = null;
@ -142,8 +141,8 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
*/ */
TracingWrapper(int offset) { TracingWrapper(int offset) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
fSubmittedAt = new StackTraceElement[stackTrace.length - offset]; fSubmittedAt = new StackTraceWrapper(new StackTraceElement[stackTrace.length - offset]);
System.arraycopy(stackTrace, offset - 1, fSubmittedAt, 0, fSubmittedAt.length); System.arraycopy(stackTrace, offset - 1, fSubmittedAt.fStackTraceElements, 0, fSubmittedAt.fStackTraceElements.length);
if (isInExecutorThread() && fCurrentlyExecuting != null) { if (isInExecutorThread() && fCurrentlyExecuting != null) {
fSubmittedBy = fCurrentlyExecuting; fSubmittedBy = fCurrentlyExecuting;
} }
@ -158,16 +157,7 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
StringBuilder traceBuilder = new StringBuilder(); StringBuilder traceBuilder = new StringBuilder();
// Record the time // Record the time
long time = System.currentTimeMillis(); traceBuilder.append(DsfPlugin.getDebugTime());
long seconds = (time / 1000) % 1000;
if (seconds < 100) traceBuilder.append('0');
if (seconds < 10) traceBuilder.append('0');
traceBuilder.append(seconds);
traceBuilder.append(',');
long millis = time % 1000;
if (millis < 100) traceBuilder.append('0');
if (millis < 10) traceBuilder.append('0');
traceBuilder.append(millis);
traceBuilder.append(' '); traceBuilder.append(' ');
// Record the executor # // Record the executor #
@ -187,7 +177,7 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
} }
if (dsfExecutable.fCreatedAt != null) { if (dsfExecutable.fCreatedAt != null) {
traceBuilder.append(" at "); traceBuilder.append(" at ");
traceBuilder.append(dsfExecutable.fCreatedAt[0].toString()); traceBuilder.append(dsfExecutable.fCreatedAt.fStackTraceElements[0].toString());
} }
} }
} }
@ -200,7 +190,7 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
traceBuilder.append(fSubmittedBy.fSequenceNumber); traceBuilder.append(fSubmittedBy.fSequenceNumber);
} }
traceBuilder.append(" at "); traceBuilder.append(" at ");
traceBuilder.append(fSubmittedAt[0].toString()); traceBuilder.append(fSubmittedAt.fStackTraceElements[0].toString());
// Finally, the executable's toString(). // Finally, the executable's toString().
traceBuilder.append("\n "); traceBuilder.append("\n ");
@ -214,6 +204,7 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
abstract protected Object getExecutable(); abstract protected Object getExecutable();
} }
class TracingWrapperRunnable extends TracingWrapper implements Runnable { class TracingWrapperRunnable extends TracingWrapper implements Runnable {
final Runnable fRunnable; final Runnable fRunnable;
public TracingWrapperRunnable(Runnable runnable, int offset) { public TracingWrapperRunnable(Runnable runnable, int offset) {
@ -225,6 +216,13 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
public void run() { public void run() {
traceExecution(); traceExecution();
// If debugging a DSF exeutable, mark that it was executed.
if (DEBUG_EXECUTOR && fRunnable instanceof DsfExecutable) {
((DsfExecutable)fRunnable).setExecuted();
}
// Finally invoke the runnable code.
fRunnable.run(); fRunnable.run();
} }
} }
@ -240,6 +238,13 @@ public class DefaultDsfExecutor extends ScheduledThreadPoolExecutor
public T call() throws Exception { public T call() throws Exception {
traceExecution(); traceExecution();
// If debugging a DSF exeutable, mark that it was executed.
if (DEBUG_EXECUTOR && fCallable instanceof DsfExecutable) {
((DsfExecutable)fCallable).setExecuted();
}
// Finally invoke the runnable code.
return fCallable.call(); return fCallable.call();
} }
} }

View file

@ -53,7 +53,7 @@ abstract public class Done extends DsfRunnable {
* @return Returns true if there was an error that was propagated and * @return Returns true if there was an error that was propagated and
* the caller can stop processing result. * the caller can stop processing result.
*/ */
protected boolean propagateErrorToClient(DsfExecutor executor, Done clientDone, String message) { protected boolean propagateError(DsfExecutor executor, Done clientDone, String message) {
if (clientDone.getStatus().getSeverity() == IStatus.CANCEL) { if (clientDone.getStatus().getSeverity() == IStatus.CANCEL) {
return true; return true;
} }

View file

@ -13,6 +13,9 @@ package org.eclipse.dd.dsf.concurrent;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.eclipse.core.runtime.Platform;
import org.eclipse.dd.dsf.DsfPlugin;
/** /**
* Base class for DSF-instrumented alternative to the Runnable/Callable interfaces. * Base class for DSF-instrumented alternative to the Runnable/Callable interfaces.
* <p> * <p>
@ -23,15 +26,48 @@ import java.util.Set;
*/ */
@Immutable @Immutable
public class DsfExecutable { public class DsfExecutable {
final StackTraceElement[] fCreatedAt; /**
* Flag indicating that tracing of the DSF executor is enabled. It enables
* storing of the "creator" information as well as tracing of disposed
* runnables that have not been submitted to the executor.
*/
static boolean DEBUG_EXECUTOR = false;
/**
* Flag indicating that assertions are enabled. It enables storing of the
* "creator" executable for debugging purposes.
*/
static boolean ASSERTIONS_ENABLED = false;
static {
assert ASSERTIONS_ENABLED = true;
DEBUG_EXECUTOR = DsfPlugin.DEBUG && "true".equals( //$NON-NLS-1$
Platform.getDebugOption("org.eclipse.dd.dsf/debug/executor")); //$NON-NLS-1$
assert ASSERTIONS_ENABLED = true;
}
/**
* Field that holds the stack trace of where this executable was created.
* Used for tracing and debugging only.
*/
final StackTraceWrapper fCreatedAt;
/**
* Field holding the reference of the executable that created this
* executable. Used for tracing only.
*/
final DefaultDsfExecutor.TracingWrapper fCreatedBy; final DefaultDsfExecutor.TracingWrapper fCreatedBy;
/**
* Flag indicating whether this executable was ever executed by an
* executor. Used for tracing only.
*/
private boolean fExecuted = false;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public DsfExecutable() { public DsfExecutable() {
// Use assertion flag (-ea) to jre to avoid affecting performance when not debugging. // Use assertion flag (-ea) to jre to avoid affecting performance when not debugging.
boolean assertsEnabled = false; if (ASSERTIONS_ENABLED || DEBUG_EXECUTOR) {
assert assertsEnabled = true;
if (assertsEnabled || DefaultDsfExecutor.DEBUG_EXECUTOR) {
// Find the runnable/callable that is currently running. // Find the runnable/callable that is currently running.
DefaultDsfExecutor executor = DefaultDsfExecutor.fThreadToExecutorMap.get(Thread.currentThread()); DefaultDsfExecutor executor = DefaultDsfExecutor.fThreadToExecutorMap.get(Thread.currentThread());
if (executor != null) { if (executor != null) {
@ -53,11 +89,38 @@ public class DsfExecutable {
for (i = 3; i < stackTrace.length; i++) { for (i = 3; i < stackTrace.length; i++) {
if ( !classNamesSet.contains(stackTrace[i].getClassName()) ) break; if ( !classNamesSet.contains(stackTrace[i].getClassName()) ) break;
} }
fCreatedAt = new StackTraceElement[stackTrace.length - i]; fCreatedAt = new StackTraceWrapper(new StackTraceElement[stackTrace.length - i]);
System.arraycopy(stackTrace, i, fCreatedAt, 0, fCreatedAt.length); System.arraycopy(stackTrace, i, fCreatedAt.fStackTraceElements, 0, fCreatedAt.fStackTraceElements.length);
} else { } else {
fCreatedAt = null; fCreatedAt = null;
fCreatedBy = null; fCreatedBy = null;
} }
} }
/**
* Marks this executable to indicate that it has been executed by the
* executor. To be invoked only by DsfExecutor.
*/
void setExecuted() {
fExecuted = true;
}
@Override
protected void finalize() {
if (DEBUG_EXECUTOR && !fExecuted) {
StringBuilder traceBuilder = new StringBuilder();
// Record the time
traceBuilder.append(DsfPlugin.getDebugTime());
traceBuilder.append(' ');
// Record the event
traceBuilder.append("DsfExecutable was never executed:\n ");
traceBuilder.append(this);
traceBuilder.append("\nCreated at:");
traceBuilder.append(fCreatedAt);
DsfPlugin.debug(traceBuilder.toString());
}
}
} }

View file

@ -10,13 +10,13 @@
*******************************************************************************/ *******************************************************************************/
package org.eclipse.dd.dsf.concurrent; package org.eclipse.dd.dsf.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.DsfPlugin;
import org.eclipse.dd.dsf.service.IDsfService;
/** /**
* A convenience class that allows a client to retrieve data from services * A convenience class that allows a client to retrieve data from services
@ -28,135 +28,124 @@ import org.eclipse.dd.dsf.service.IDsfService;
* @see java.util.concurrent.Callable * @see java.util.concurrent.Callable
*/ */
@ThreadSafe @ThreadSafe
abstract public class DsfQuery<V> { abstract public class DsfQuery<V> extends DsfRunnable
implements Future<V>
{
/** The synchronization object for this query */
final Sync fSync = new Sync();
public V get() throws InterruptedException, ExecutionException { return fSync.doGet(); }
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return fSync.doGet();
}
/**
* Don't try to interrupt the DSF executor thread, just ignore the request
* if set.
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return fSync.doCancel();
}
public boolean isCancelled() { return fSync.doIsCancelled(); }
public boolean isDone() { return fSync.doIsDone(); }
protected void done(V result) {
fSync.doSet(result);
}
protected void doneException(Throwable t) {
fSync.doSetException(t);
}
abstract protected void execute();
public void run() {
if (fSync.doRun()) {
execute();
}
}
@SuppressWarnings("serial")
final class Sync extends AbstractQueuedSynchronizer {
private static final int STATE_RUNNING = 1;
private static final int STATE_DONE = 2;
private static final int STATE_CANCELLED = 4;
private V fResult; private V fResult;
private boolean fValid; private Throwable fException;
private DsfExecutor fExecutor;
private Future fFuture;
private boolean fWaiting;
private IStatus fStatus = Status.OK_STATUS;
public DsfQuery(DsfExecutor executor) { private boolean ranOrCancelled(int state) {
fExecutor = executor; return (state & (STATE_DONE | STATE_CANCELLED)) != 0;
} }
/** protected int tryAcquireShared(int ignore) {
* Start data retrieval. return doIsDone()? 1 : -1;
* Client must implement this method to do whatever is needed to retrieve data.
* Retrieval can be (but does not have to be) asynchronious - it meas this method can return
* before data is retrieved. When data is ready Proxy must be notified by calling done() method.
*/
protected abstract void execute();
/**
* Allows deriving classes to implement their own snipped additional
* cancellation code.
*/
protected void revokeChildren(V result) {};
/**
* Get data associated with this proxy. This method is thread safe and
* it will block until data is ready. Because it's a blocking call and it waits
* for commands to be processed on the dispatch thread, this methods itself
* CANNOT be called on the dispatch thread.
*/
public synchronized V get() {
assert !fExecutor.isInExecutorThread();
if(!fValid) {
if (!fWaiting) {
fFuture = fExecutor.submit(new DsfRunnable() {
public void run() {
// Note: not sure if this try-catch is desirable. It might encourage
// implementors to not catch its own exceptions. If the query code takes
// more than one dispatch, then this code will not be helpful anyway.
try {
DsfQuery.this.execute();
} catch (Throwable t) {
doneException(t);
}
}
});
} }
fWaiting = true; protected boolean tryReleaseShared(int ignore) {
try { return true;
while(fWaiting) {
wait();
} }
} catch (InterruptedException e) {
fStatus = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfService.INTERNAL_ERROR, boolean doIsCancelled() {
"Interrupted exception while waiting for result.", e); return getState() == STATE_CANCELLED;
fValid = true;
} }
assert fValid;
boolean doIsDone() {
return ranOrCancelled(getState());
} }
V doGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == STATE_CANCELLED) throw new CancellationException();
if (fException != null) throw new ExecutionException(fException);
return fResult; return fResult;
} }
/** V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
* Same as get(), but with code to automatically re-threw the exception if one if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
* was reported by the run() method. if (getState() == STATE_CANCELLED) throw new CancellationException();
*/ if (fException != null) throw new ExecutionException(fException);
public V getWithThrows() throws CoreException {
V retVal = get();
if (!getStatus().isOK()) {
throw new CoreException(getStatus());
}
return retVal;
}
public IStatus getStatus() { return fStatus; }
/** Abort current operation and keep old proxy data */
public synchronized void cancel() {
assert fExecutor.isInExecutorThread();
assert !fWaiting || !fValid;
if (fWaiting) {
fFuture.cancel(false);
fWaiting = false;
notifyAll();
} else if (fValid) {
revokeChildren(fResult);
}
fValid = true;
}
/** Abort current operation and set proxy data to 'result' */
public synchronized void cancel(V newResult) {
fResult = newResult;
cancel();
}
public Object getCachedResult() {
return fResult; return fResult;
} }
public boolean isValid() { return fValid; } void doSet(V v) {
while(true) {
int s = getState();
if (ranOrCancelled(s)) return;
if (compareAndSetState(s, STATE_DONE)) break;
}
fResult = v;
releaseShared(0);
}
public synchronized void done(V result) { void doSetException(Throwable t) {
// Valid could be true if request was cancelled while data was while(true) {
// being retrieved, and then done() was called. int s = getState();
if (fValid) return; if (ranOrCancelled(s)) return;
if (compareAndSetState(s, STATE_DONE)) break;
}
fException = t;
fResult = null;
releaseShared(0);
}
fResult = result; boolean doCancel() {
fValid = true; while(true) {
if (fWaiting) { int s = getState();
fWaiting = false; if (ranOrCancelled(s)) return false;
notifyAll(); if (compareAndSetState(s, STATE_CANCELLED)) break;
} }
} releaseShared(0);
return true;
public synchronized void doneError(IStatus errorStatus) { }
if (fValid) return;
fStatus = errorStatus; boolean doRun() {
done(null); return compareAndSetState(0, STATE_RUNNING);
} }
public synchronized void doneException(Throwable t) {
if (fValid) return;
doneError(new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfService.INTERNAL_ERROR,
"Exception while computing result.", t));
} }
} }

View file

@ -10,8 +10,17 @@
*******************************************************************************/ *******************************************************************************/
package org.eclipse.dd.dsf.concurrent; package org.eclipse.dd.dsf.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.MultiStatus;
import org.eclipse.core.runtime.NullProgressMonitor; import org.eclipse.core.runtime.NullProgressMonitor;
import org.eclipse.core.runtime.Status; import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.DsfPlugin; import org.eclipse.dd.dsf.DsfPlugin;
@ -39,202 +48,406 @@ import org.eclipse.dd.dsf.DsfPlugin;
* has to be re-implemented every time. The Sequence class tries to address * has to be re-implemented every time. The Sequence class tries to address
* this problem by containing this pattern in a single class. * this problem by containing this pattern in a single class.
*/ */
abstract public class DsfSequence { @ThreadSafe
abstract public class DsfSequence extends DsfRunnable implements Future<Object> {
/** /**
* The abstract class that each step has to implement * The abstract class that each step has to implement.
*/
abstract public static class Step {
private DsfSequence fSequence;
/**
* Sets the sequence that this step belongs to. It is only accessible
* by the sequence itself, and is not meant to be called by sequence
* sub-classes.
*/
void setSequence(DsfSequence sequence) { fSequence = sequence; }
/** Returns the sequence that this step is running in. */
public DsfSequence getSequence() { return fSequence; }
/**
* Executes the next step. Overriding classes should perform the
* work in this method.
* @param done Result token to submit to executor when step is finished.
*/
public void execute(Done done) {
getSequence().getExecutor().execute(done);
}
/**
* Roll back gives the step implementation a chance to undo the
* operation that was performed by execute().
* @param done Result token to submit to executor when rolling back the step is finished.
*/
public void rollBack(Done done) {
getSequence().getExecutor().execute(done);
}
/**
* Returns the number of progress monitor ticks corresponding to this
* step.
*/ */
abstract public class Step {
public void execute() { stepFinished(); }
public void rollBack() { stepRolledBack(); }
public int getTicks() { return 1; } public int getTicks() { return 1; }
} }
private DsfExecutor fExecutor; /** The synchronization object for this future */
private Step[] fSteps; final Sync fSync = new Sync();
private Done fDoneQC;
private String fTaskName; /**
private String fRollbackTaskName; * Executor that this sequence is running in. It is used by the sequence
private IProgressMonitor fProgressMonitor = new NullProgressMonitor(); * to submit the runnables for steps, and for submitting the result.
*/
final private DsfExecutor fExecutor;
/**
* Result callback to invoke when the sequence is finished. Intended to
* be used when the sequence is created and invoked from the executor
* thread. Otherwise, the {@link Future#get()} method is the appropriate
* method of retrieving the result.
*/
final private Done fDone;
/** Status indicating the success/failure of the test. Used internally only. */
@ConfinedToDsfExecutor("getExecutor")
private IStatus fStatus = Status.OK_STATUS;
@ConfinedToDsfExecutor("getExecutor")
private int fCurrentStepIdx = 0; private int fCurrentStepIdx = 0;
boolean fCancelled = false;
/** /** Task name for this sequence used with the progress monitor */
* Default constructor. If this constructor is used, the steps need to be initialized final private String fTaskName;
* before the sequence can be invoked.
* @param executor the DSF executor which will be used to invoke all steps /** Task name used when the sequence is being rolled back. */
*/ final private String fRollbackTaskName;
final private IProgressMonitor fProgressMonitor;
/** Convenience constructor with limited arguments. */
public DsfSequence(DsfExecutor executor) { public DsfSequence(DsfExecutor executor) {
this(executor, null); this(executor, new NullProgressMonitor(), "", "", null);
}
/** Convenience constructor with limited arguments. */
public DsfSequence(DsfExecutor executor, Done done) {
this(executor, new NullProgressMonitor(), "", "", done);
} }
/** /**
* Constructor that initialized the steps. * Constructor that initialized the steps and the result callback.
* @param executor the DSF executor which will be used to invoke all steps * @param executor The DSF executor which will be used to invoke all steps.
* @param steps sequence steps * @param pm Progress monitor for monitoring this sequence. This parameter cannot be null.
* @param taskName Name that will be used in call to {@link IProgressMonitor#beginTask(String, int)},
* when the task is started.
* @param rollbackTaskName Name that will be used in call to {@link IProgressMonitor#subTask(String)}
* if the task is cancelled or aborted.
* @param Result that will be submitted to executor when sequence is finished. Can be null if calling from
* non-executor thread and using {@link Future#get()} method to wait for the sequence result.
*/ */
public DsfSequence(DsfExecutor executor, Step[] steps) {
public DsfSequence(DsfExecutor executor, IProgressMonitor pm, String taskName, String rollbackTaskName, Done done) {
fExecutor = executor; fExecutor = executor;
fSteps = steps; fProgressMonitor = pm;
fTaskName = taskName;
fRollbackTaskName = rollbackTaskName;
fDone = done;
} }
/**
* Returns the steps to be executed. It is up to the deriving class to
* supply the steps and to ensure that the list of steps will not be
* modified after the sequence is constructed.
*/
abstract public Step[] getSteps();
/** Returns the DSF executor for this sequence */ /** Returns the DSF executor for this sequence */
public DsfExecutor getExecutor() { return fExecutor; } public DsfExecutor getExecutor() { return fExecutor; }
/**
* Sets the done callback to be submitted when the sequence is finished.
* If the sequence is submitted by a caller in the dispatch thread, this is
* the way that the original caller can be notified of the sequence
* completion. If the caller blocks and waits for the sequence
* completion, the Done callback is not necessary.
* @param doneQC callback to submit when sequence completes, can be null
*/
public void setDone(Done doneQC) {
fDoneQC = doneQC;
}
/** /**
* Returns the Done callback that is registered with the Sequence * Returns the Done callback that is registered with the Sequence
* @param doneQC callback that will be submitted when sequence completes, * @param doneQC callback that will be submitted when sequence completes,
* null if there is no callback configured * null if there is no callback configured
*/ */
public Done getDone() { return fDoneQC; } public Done getDone() { return fDone; }
/** Sets the steps to be executed. */
public void setSteps(Step[] steps) {
assert fCurrentStepIdx == 0;
fSteps = steps;
}
/** Returns the steps to be executed. */
public Step[] getSteps() { return fSteps; }
/** /**
* Returns index of the step that is currently being executed. * The get method blocks until sequence is complete, but always returns null.
* <br>NOTE: After sequence is invoked, this method should be called * @see java.concurrent.Future#get
* only in the DSF executor thread.
* @return
*/ */
public int getCurrentIdx() { return fCurrentStepIdx; } public Object get() throws InterruptedException, ExecutionException {
fSync.doGet();
/** return null;
* Sets the progress monitor that will be called by the sequence with udpates.
* @param pm
*/
public void setProgressMonitor(IProgressMonitor pm) { fProgressMonitor = pm; }
/**
* Sets the task name for this sequence. To be used with progress monitor;
* @param taskName
*/
public void setTaskName(String taskName) { fTaskName = taskName; }
/**
* Sets the task name to be used with progress monitor, if this sequence needs
* to be rolled back as result of cancellation or error.
* @param taskName
*/
public void setRollBackTaskName(String n) { fRollbackTaskName = n; }
/** Submits this sequence to the executor. */
public void invokeLater() {
getExecutor().submit( new DsfRunnable() { public void run() { doInvoke(); } });
} }
/** /**
* Submits this sequence to the DSF executor, and blocks waiting for the * The get method blocks until sequence is complete or until timeout is
* sequence to complete. * reached, but always returns null.
* <br>NOTE: This method is NOT to be called on the DSF executor thread. * @see java.concurrent.Future#get
*/ */
public synchronized void invoke() { public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
assert !fExecutor.isInExecutorThread() : fSync.doGet();
"Cannot be called on dispatch thread: " + this; return null;
setDone(new Done() { }
/**
* Don't try to interrupt the DSF executor thread, just ignore the request
* if set.
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return fSync.doCancel();
}
public boolean isCancelled() { return fSync.doIsCancelled(); }
public boolean isDone() { return fSync.doIsDone(); }
public void run() { public void run() {
synchronized(DsfSequence.this) { DsfSequence.this.notifyAll(); } // Change the state to running.
} if (fSync.doRun()) {
}); // Set the reference to this sequence in each step.
invokeLater(); int totalTicks = 0;
try { for (Step step : getSteps()) {
wait(); step.setSequence(this);
} catch (InterruptedException e) {} totalTicks += step.getTicks();
} }
private void doInvoke() { // Set the task name
assert fCurrentStepIdx == 0;
if (fTaskName != null) { if (fTaskName != null) {
fProgressMonitor.subTask(fTaskName); fProgressMonitor.beginTask(fTaskName, totalTicks);
}
fSteps[fCurrentStepIdx].execute();
} }
/** // Call the first step
* Cancells the execution of this sequence. The roll-back will start when executeStep(0);
* the current step completes. } else {
* fSync.doFinish();
*/ }
public void cancel() {
fCancelled = true;
} }
/** /**
* To be called only by the step implementation, Tells the sequence to * To be called only by the step implementation, Tells the sequence to
* submit the next step. * submit the next step.
*/ */
public void stepFinished() { private void executeStep(int nextStepIndex) {
getExecutor().submit(new DsfRunnable() { public void run() { if (isCancelled()) {
fProgressMonitor.worked(getSteps()[fCurrentStepIdx].getTicks()); cancelExecution();
fCurrentStepIdx++;
if (fCurrentStepIdx < fSteps.length) {
if (fCancelled) {
abort(new Status(
IStatus.CANCEL, DsfPlugin.PLUGIN_ID, -1,
"Cancelled" + fTaskName != null ? ": " + fTaskName : "",
null));
}
fSteps[fCurrentStepIdx].execute();
} else { } else {
if (fDoneQC != null) getExecutor().submit(fDoneQC); if (nextStepIndex < getSteps().length) {
fCurrentStepIdx = nextStepIndex;
getSteps()[fCurrentStepIdx].execute(new Done() {
final private int fStepIdx = fCurrentStepIdx;
public void run() {
// Check if we're still the correct step.
assert fStepIdx == fCurrentStepIdx;
// Proceed to the next step.
if (getStatus().isOK()) {
fProgressMonitor.worked(getSteps()[fStepIdx].getTicks());
executeStep(fStepIdx + 1);
} else {
abortExecution(getStatus());
}
}
public String toString() {
return "DsfSequence \"" + fTaskName + "\", result for executing step #" + fStepIdx + " = " + getStatus();
}
});
} else {
finish();
}
} }
}});
} }
/** /**
* To be called only by the step implementation. Tells the sequence to * To be called only by the step implementation. Tells the sequence to
* roll back next step. * roll back next step.
*/ */
public void stepRolledBack() { private void rollBackStep(int stepIdx) {
getExecutor().submit(new DsfRunnable() { public void run() { if (stepIdx >= 0) {
fProgressMonitor.worked(getSteps()[fCurrentStepIdx].getTicks()); fCurrentStepIdx = stepIdx;
fCurrentStepIdx--; getSteps()[fCurrentStepIdx].rollBack(new Done() {
if (fCurrentStepIdx >= 0) { final private int fStepIdx = fCurrentStepIdx;
fSteps[fCurrentStepIdx].rollBack(); public void run() {
// Check if we're still the correct step.
assert fStepIdx == fCurrentStepIdx;
// Proceed to the next step.
if (getStatus().isOK()) {
fProgressMonitor.worked(getSteps()[fStepIdx].getTicks());
rollBackStep(fStepIdx - 1);
} else { } else {
if (fDoneQC != null) getExecutor().submit(fDoneQC); abortRollBack(getStatus());
}
};
@Override
public String toString() {
return "DsfSequence \"" + fTaskName + "\", result for rolling back step #" + fStepIdx + " = " + getStatus();
}
});
} else {
finish();
} }
}});
} }
/** /**
* To be called only by step implementation. Tells the sequence * Tells the sequence that its execution is to be aborted and it
* that its execution is to be aborted and it should start rolling back * should start rolling back the sequence as if it was cancelled by user.
* the sequence as if it was cancelled by user.
* @param error
*/ */
public void abort(final IStatus error) { private void cancelExecution() {
getExecutor().submit(new DsfRunnable() { public void run() {
if (fRollbackTaskName != null) { if (fRollbackTaskName != null) {
fProgressMonitor.subTask(fRollbackTaskName); fProgressMonitor.subTask(fRollbackTaskName);
} }
fDoneQC.setStatus(error); fStatus = new Status(IStatus.CANCEL, DsfPlugin.PLUGIN_ID, "Sequence \"" + fTaskName + "\" cancelled.");
fCurrentStepIdx--; if (fDone != null) {
if (fCurrentStepIdx >= 0) { fDone.setStatus(fStatus);
fSteps[fCurrentStepIdx].rollBack(); }
} else {
if (fDoneQC != null) getExecutor().submit(fDoneQC); /*
* No need to call fSync, it should have been taken care of by
* Future#cancel method.
*
* Note that we're rolling back starting with the current step,
* because the current step was fully executed. This is unlike
* abortExecution() where the current step caused the roll-back.
*/
rollBackStep(fCurrentStepIdx);
}
/**
* Tells the sequence that its execution is to be aborted and it
* should start rolling back the sequence as if it was cancelled by user.
*/
private void abortExecution(final IStatus error) {
if (fRollbackTaskName != null) {
fProgressMonitor.subTask(fRollbackTaskName);
}
fStatus = error;
if (fDone != null) {
fDone.setStatus(error);
}
fSync.doAbort(new CoreException(error));
// Roll back starting with previous step, since current step failed.
rollBackStep(fCurrentStepIdx - 1);
}
/**
* Tells the sequence that that is rolling back, to abort roll back, and
* notify the clients.
*/
private void abortRollBack(final IStatus error) {
if (fRollbackTaskName != null) {
fProgressMonitor.subTask(fRollbackTaskName);
}
/*
* Compose new status based on previous status information and new
* error information.
*/
MultiStatus newStatus =
new MultiStatus(DsfPlugin.PLUGIN_ID, error.getCode(),
"Sequence \"" + fTaskName + "\" failed while rolling back.", null);
newStatus.merge(error);
newStatus.merge(fStatus);
fStatus = newStatus;
if (fDone != null) {
fDone.setStatus(newStatus);
}
finish();
}
private void finish() {
if (fDone != null) getExecutor().submit(fDone);
fSync.doFinish();
}
@SuppressWarnings("serial")
final class Sync extends AbstractQueuedSynchronizer {
private static final int STATE_RUNNING = 1;
private static final int STATE_FINISHED = 2;
private static final int STATE_ABORTING = 4;
private static final int STATE_ABORTED = 8;
private static final int STATE_CANCELLING = 16;
private static final int STATE_CANCELLED = 32;
private Throwable fException;
private boolean isFinished(int state) {
return (state & (STATE_FINISHED | STATE_CANCELLED | STATE_ABORTED)) != 0;
}
protected int tryAcquireShared(int ignore) {
return doIsDone()? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
return true;
}
boolean doIsCancelled() {
int state = getState();
return (state & (STATE_CANCELLING | STATE_CANCELLED)) != 0;
}
boolean doIsDone() {
return isFinished(getState());
}
void doGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == STATE_CANCELLED) throw new CancellationException();
if (fException != null) throw new ExecutionException(fException);
}
void doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
if (getState() == STATE_CANCELLED) throw new CancellationException();
if (fException != null) throw new ExecutionException(fException);
}
void doAbort(Throwable t) {
while(true) {
int s = getState();
if (isFinished(s)) return;
if (compareAndSetState(s, STATE_ABORTING)) break;
}
fException = t;
}
boolean doCancel() {
while(true) {
int s = getState();
if (isFinished(s)) return false;
if (s == STATE_ABORTING) return false;
if (compareAndSetState(s, STATE_CANCELLING)) break;
}
return true;
}
void doFinish() {
while(true) {
int s = getState();
if (isFinished(s)) return;
if (s == STATE_ABORTING) {
if (compareAndSetState(s, STATE_ABORTED)) break;
} else if (s == STATE_CANCELLING) {
if (compareAndSetState(s, STATE_CANCELLED)) break;
} else {
if (compareAndSetState(s, STATE_FINISHED)) break;
}
}
releaseShared(0);
}
boolean doRun() {
return compareAndSetState(0, STATE_RUNNING);
} }
}});
} }
} }

View file

@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.dsf.concurrent;
/**
* Untility class for easy pretty-printing stack traces. Local to the
* concurrent package.
*/
@Immutable
class StackTraceWrapper {
final StackTraceElement[] fStackTraceElements;
StackTraceWrapper(StackTraceElement[] elements) { fStackTraceElements = elements; }
public String toString() {
StringBuilder builder = new StringBuilder(fStackTraceElements.length * 30);
for (int i = 0; i < fStackTraceElements.length && i < 10; i++) {
builder.append(fStackTraceElements[i]);
if (i < fStackTraceElements.length && i < 10) builder.append("\n at ");
}
return builder.toString();
}
}