Version
1.0
Pawel Piech
© 2006, Wind River Systems. Release
under EPL version 1.0.
public interface DsfExecutor extends ScheduledExecutorService
{
/**
* Checks if the thread that this method is called in is the same as the
* executor's dispatch thread.
* @return true if in DSF executor's dispatch thread
*/
public boolean isInExecutorThread();
}
Service:The service performs the asynchronous operation a background thread, but it can still submit the Done runnable with the executor. In other words, the Done and other runnables can be submitted from any thread, but will always execute in the single dispatch thread. Also if the implementation of the asyncMethod() is non-blocking, it does not need to start a job, it could just perform the operation in the dispatch thread. On the client side, care has to be taken to save appropriate state before the asynchronous method is called, because by the time the Done is executed, the client state may change.
public class Service {
void asyncMethod(Done done) {
new Job() {
public void run() {
// perform calculation
...
done.setStatus(new Status(IStatus.ERROR, ...));
fExecutor.execute(done);
}
}.schedule();
}
}
Client:
...
Service service = new Service();
final String clientData = "xyz";
...
service.asynMethod(new Done() {
public void run() {
if (getStatus().isOK()) {
// Handle return data
...
} else {
// Handle error
...
}
}
}
Service:The biggest drawback to using Future with DSF services, is that it does not work with asynchronous methods. This is because the Callable.call() implementation has to return a value within a single dispatch cycle. To get around this, DSF has an additional object called DsfQuery, which works like a Future combined with a Callable, but allows the implementation to make multiple dispatches before setting the return value to the client. The DsfQuery object works as follows:
public class Service {
int syncMethod() {
// perform calculation
...
return result;
}
}
Client:
...
DsfExecutor executor = new DsfExecutor();
final Service service = new Service(executor);
Future<Integer> future = executor.submit(new Callable<Integer>() {
Integer call() {
return service.syncMethod();
}
});
int result = future.get();
In detail, these components look like this:
Table Viewer
The table viewer is the standard org.eclipse.jface.viewers.TableViewer, created with SWT.VIRTUAL flag. It has an associated content provider, SlowDataProviderContentProvider) which handles all the interactions with the data provider. The lazy content provider operates in a very simple cycle:
public void updateElement(final int index) {Likewise, when the content provider calls the table viewer, it also has to switch back into the display thread as in following example, when the content provider receives an event from the data provider, that an item value has changed.
assert fTableViewer != null;
if (fDataProvider == null) return;
fDataProvider.getExecutor().execute(
new Runnable() { public void run() {
// Must check again, in case disposed while redispatching.
if (fDataProvider == null) return;
queryItemData(index);
}});
}
public void dataChanged(final Set<Integer> indexes) {All of this switching back and forth between threads makes the code look a lot more complicated than it really is, and it takes some getting used to, but this is the price to be paid for multi-threading. Whether the participants use semaphores or the dispatch thread, the logic is equally complicated, and we believe that using a single dispatch thread, makes the synchronization very explicit and thus less error-prone.
// Check for dispose.
if (fDataProvider == null) return;
// Clear changed items in table viewer.
if (fTableViewer != null) {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
for (Integer index : indexes) {
tableViewer.clear(index);
}
}});
}
}
Data Provider Service
The data provider service interface, DataProvider, is very similar to that of the lazy content provider. It has methods to:
Slow Data Provider
The data provider is actually implemented as a thread which is an inner class of SlowDataProvider service. The provider thread communicates with the service by reading Request objects from a shared queue, and by posting Runnable objects directly to the DsfExecutor but with a simulated transmission delay. Separately, an additional flag is also used to control the shutdown of the provider thread.
To simulate a real back end, the data provider randomly invalidates a set of items and notifies the listeners to update themselves. It also periodically invalidates the whole table and forces the clients to requery all items.Input Buffer
The main feature of this pattern is a buffer for holding the requests before sending them to the data provider. In this example the user requests are buffered in two arrays: fGetItemIndexesBuffer and fGetItemDonesBuffer. The DataProvider.getItem() implementation is changed as follows:
public void getItem(final int index, final GetDataDone<String> done) {And method that services the buffer looks like this:
// Schedule a buffer-servicing call, if one is needed.
if (fGetItemIndexesBuffer.isEmpty()) {
fExecutor.schedule(
new Runnable() { public void run() {
fileBufferedRequests();
}},
COALESCING_DELAY_TIME,
TimeUnit.MILLISECONDS);
}
// Add the call data to the buffer.
// Note: it doesn't matter that the items were added to the buffer
// after the buffer-servicing request was scheduled. This is because
// the buffers are guaranteed not to be modified until this dispatch
// cycle is over.
fGetItemIndexesBuffer.add(index);
fGetItemDonesBuffer.add(done);
}
public void fileBufferedRequests() {The most interesting feature of this implementation is the fact that there are no semaphores anywhere to control access to the input buffers. Even though the buffers are serviced with a delay and multiple clients can call the getItem() method, the use of a single dispatch thread prevents any race conditions that could corrupt the buffer data. In real-world implementations, the buffers and caches that need to be used are far more sophisticated with much more complicated logic, and this is where managing access to them using the dispatch thread is ever more important.
// Remove a number of getItem() calls from the buffer, and combine them
// into a request.
int numToCoalesce = Math.min(fGetItemIndexesBuffer.size(), COALESCING_COUNT_LIMIT);
final ItemRequest request = new ItemRequest(new Integer[numToCoalesce], new GetDataDone[numToCoalesce]);
for (int i = 0; i < numToCoalesce; i++) {
request.fIndexes[i] = fGetItemIndexesBuffer.remove(0);
request.fDones[i] = fGetItemDonesBuffer.remove(0);
}
// Queue the coalesced request, with the appropriate transmission delay.
fQueue.add(request);
// If there are still calls left in the buffer, execute another
// buffer-servicing call, but without any delay.
if (!fGetItemIndexesBuffer.isEmpty()) {
fExecutor.execute(new Runnable() { public void run() {
fileBufferedRequests();
}});
}
}
Table Viewer
Unlike coalescing, which can be implemented entirely within the service, cancellability requires that the client be modified as well to take advantage of this capability. For the table viewer content provider, this means that additional features have to be added. In CancellingSlowDataProviderContentProvider.java ILazyContentProvider.updateElement() was changes as follows:
public void updateElement(final int index) {Now the client keeps track of the requests it made to the service in fItemDataDones, and above, cancelStaleRequests() iterates through all the outstanding requests and cancels the ones that are no longer in the visible range.
assert fTableViewer != null;
if (fDataProvider == null) return;
// Calculate the visible index range.
final int topIdx = fTableViewer.getTable().getTopIndex();
final int botIdx = topIdx + getVisibleItemCount(topIdx);
fCancelCallsPending.incrementAndGet();
fDataProvider.getExecutor().execute(
new Runnable() { public void run() {
// Must check again, in case disposed while redispatching.
if (fDataProvider == null || fTableViewer.getTable().isDisposed()) return;
if (index >= topIdx && index <= botIdx) {
queryItemData(index);
}
cancelStaleRequests(topIdx, botIdx);
}});
}
Data Provider Service
The data provider implementation (CancellableInputCoalescingSlowDataProvider.java), builds on top of the coalescing data provider. To make the canceling feature useful, the data provider service has to limit the size of the request queue. This is because in this example which simulates communication with a target and once requests are filed into the request queue, they cannot be canceled, just like a client can't cancel request once it sends them over a socket. So instead, if a flood of getItem() calls comes in, the service has to hold most of them in the coalescing buffer in case the client decides to cancel them. Therefore the fileBufferedRequests() method includes a simple check before servicing the buffer, and if the request queue is full, the buffer servicing call is delayed.
if (fQueue.size() >= REQUEST_QUEUE_SIZE_LIMIT) {Beyond this change, the only other significant change is that before the requests are queued, they are checked for cancellation.
if (fGetItemIndexesBuffer.isEmpty()) {
fExecutor.schedule(
new Runnable() { public void run() {
fileBufferedRequests();
}},
REQUEST_BUFFER_FULL_RETRY_DELAY,
TimeUnit.MILLISECONDS);
}
return;
}
Coalescing and Cancellability are both optimizations. Neither of these optimizations affected the original interface of the service, and one of them only needed a service-side modification. But as with all optimizations, it is often better to first make sure that the whole system is working correctly and then add optimizations where they can make the biggest difference in user experience.
The above examples of optimizations can take many forms, and as
mentioned with coalescing, caching data that is retrieved from the data
provider is the most common form of data coalescing. For
cancellation, many services in DSF build on top of other services,
which means that even a low-level service can cause a higher
level service to retrieve data, while another event might cause it to
cancel those requests. The perfect example of this is a Variables
service, which is responsible for calculating the value of expressions
shown in the Variables view. The Variables service reacts to the
Run Control service, which issues a suspended event and then requests a
set of variables to be evaluated by the debugger back end. But as
soon as a resumed event is issued by Run Control, the Variables service
needs to cancel the pending evaluation requests.