1
0
Fork 0
mirror of https://github.com/eclipse-cdt/cdt synced 2025-04-29 19:45:01 +02:00

[220446] Updated the "dataviewer" example and excercises for EclipseCon tutorial.

This commit is contained in:
Pawel Piech 2008-02-28 18:42:09 +00:00
parent f3cd935a6c
commit 642ea06516
18 changed files with 1151 additions and 1957 deletions

View file

@ -6,12 +6,12 @@
point="org.eclipse.ui.views">
<category
name="DSF Examples"
id="org.eclipse.dd.dsf.examples.timers">
id="org.eclipse.dd.examples.dsf">
</category>
<view
name="Timers View"
icon="icons/timer.gif"
category="org.eclipse.dd.dsf.examples.timers"
category="org.eclipse.dd.examples.dsf"
class="org.eclipse.dd.examples.dsf.timers.TimersView"
id="org.eclipse.dd.dsf.examples.model.TimersAlarmsView">
</view>
@ -22,34 +22,16 @@
id="org.eclipse.dd.dsf.test.actionSet"
label="DSF Examples">
<menu
id="org.eclipse.dd.dsf.examples.timers"
id="org.eclipse.dd.examples.dsf"
label="DSF Examples"
path="additions">
<groupMarker name="concurrent"/>
</menu>
<action
class="org.eclipse.dd.examples.dsf.dataviewer.CancellableInputCoalescingSlowDataProviderAction"
id="org.eclipse.dd.dsf.test.cancellableInputCoalescingSlowDataProvider"
label="Open Dialog with Cancellable Input-Coalescing Slow Data Provider"
menubarPath="org.eclipse.dd.dsf.examples.timers/concurrent"
style="push"/>
<action
class="org.eclipse.dd.examples.dsf.dataviewer.InputCoalescingSlowDataProviderAction"
id="org.eclipse.dd.dsf.test.inputCoalescingSlowDataProvider"
label="Open Dialog with Input-Coalescing Slow Data Provider"
menubarPath="org.eclipse.dd.dsf.examples.timers/concurrent"
style="push"/>
<action
class="org.eclipse.dd.examples.dsf.dataviewer.SlowDataProviderAction"
id="org.eclipse.dd.dsf.test.slowDataProvider"
label="Open Dialog with Slow Data Provider"
menubarPath="org.eclipse.dd.dsf.examples.timers/concurrent"
style="push"/>
<action
class="org.eclipse.dd.examples.dsf.filebrowser.FileBrowserAction"
id="org.eclipse.dd.dsf.test.fileBrowser"
label="Open File Browser Dialog"
menubarPath="org.eclipse.dd.dsf.examples.timers/concurrent"
menubarPath="org.eclipse.dd.examples.dsf/concurrent"
style="push"/>
</actionSet>
</extension>

View file

@ -1,440 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.DsfRunnable;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
/**
* Example data provider which has a built-in delay when fetching data. This
* data provider simulates a service which retrieves data from an external
* source such as a networked target, which incurs a considerable delay when
* retrieving data. The data items are simulated values which consist of the
* time when data is being retrieved followed by the item's index.
* <p>
* This version of the data provider builds on the input-coalescing feature,
* it adds a test to check if the requests were cancelled by the user, before
* they are coalesced and queued to be processed by the provider thread.
*/
public class CancellableInputCoalescingSlowDataProvider implements DataProvider {
/** Minimum count of data items */
private final static int MIN_COUNT = 1000;
/** Maximum count of data items */
private final static int MAX_COUNT = 2000;
/** Time interval how often random changes occur. */
private final static int RANDOM_CHANGE_MILIS = 10000;
/** Number of times random changes are made, before count is changed. */
private final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
/** Percentage of values that is changed upon random change (0-100). */
private final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
/**
* Amount of time (in miliseconds) how long the requests to provider, and
* events from provider are delayed by.
*/
private final static int TRANSMISSION_DELAY_TIME = 500;
/**
* Amount of time (in milliseconds) how long the provider takes to process
* a request.
*/
private final static int PROCESSING_TIME = 100;
/**
* Maximum number of item requests that can be coalesced into a single
* request.
*/
private final static int COALESCING_COUNT_LIMIT = 10;
/**
* Delay in processing the buffer of getItem() calls. This delay helps
* to ensure that a meaningful number of items is present in the buffer
* before the buffer data is coalesced into a request.
*/
private final static int COALESCING_DELAY_TIME = 100;
/**
* Maximum allowed number of requests in transmission to provider thread.
* This limit causes most of the client calls to be buffered at the input
* rather than in the request queue, which in truns allows the stale
* requests to be cancelled, before they are sent to the provider thread
* for processing.
*/
private final static int REQUEST_QUEUE_SIZE_LIMIT = 100;
/** Delay before processing calls buffer, if the request queue is full */
private final static int REQUEST_BUFFER_FULL_RETRY_DELAY = PROCESSING_TIME;
/** Dispatch-thread executor that this provider uses. */
private DsfExecutor fExecutor;
/** List of listeners registered for events from provider. */
private List<Listener> fListeners = new LinkedList<Listener>();
/** Thread that handles data requests. */
private ProviderThread fProviderThread;
/** Queue of currently pending data requests. */
private final BlockingQueue<Request> fQueue = new DelayQueue<Request>();
/**
* Runnable to be submitted when the data provider thread is shut down.
* This variable acts like a flag: when client want to shut down the
* provider, it sets this runnable, and when the backgroun thread sees
* that it's set, it shuts itself down, and posts this runnable with
* the executor.
*/
private RequestMonitor fShutdownRequestMonitor = null;
/**
* Buffers for coalescing getItem() calls into a single request.
*/
private List<Integer> fGetItemIndexesBuffer = new LinkedList<Integer>();
private List<DataRequestMonitor<String>> fGetItemRequestMonitorsBuffer = new LinkedList<DataRequestMonitor<String>>();
/**
* Base class for requests that are queued by the data provider. It
* implements java.util.concurrent.Delayed to allow for use of DelayedQueue.
* Every request into the queue is delayed by the simulated transmission
* time.
*/
private static abstract class Request implements Delayed {
/** Sequence counter and number are used to ensure FIFO order **/
private static int fSequenceCounter = 0;
private int fSequenceNumber = fSequenceCounter++;
/** Time delay tracks how items will be delayed. **/
private long fTime = System.currentTimeMillis() + TRANSMISSION_DELAY_TIME;
// @see java.util.concurrent.Delayed
public long getDelay(TimeUnit unit) {
return unit.convert(fTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// @see java.lang.Comparable
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
Request x = (Request)other;
long diff = fTime - x.fTime;
if (diff < 0) return -1;
else if (diff > 0) return 1;
else if (fSequenceNumber < x.fSequenceNumber) return -1;
else return 1;
}
/** All requests have an associated array of RequestMonitor tokens **/
abstract RequestMonitor[] getRequestMonitors();
}
/**
* Object used to encapsulate the "getItemCount" requests. Instances of it
* are queued till processed.
*/
private static class CountRequest extends Request
{
DataRequestMonitor<Integer> fRequestMonitor;
CountRequest(DataRequestMonitor<Integer> rm) { fRequestMonitor = rm; }
@Override
DataRequestMonitor<?>[] getRequestMonitors() { return new DataRequestMonitor[] { fRequestMonitor }; }
}
/**
* Object used to encapsulate the "getItem" requests. Instances of it
* are queued till processed.
*/
private static class ItemRequest extends Request
{
DataRequestMonitor<String>[] fRequestMonitors;
Integer[] fIndexes;
ItemRequest(Integer[] indexes, DataRequestMonitor<String>[] rms) { fIndexes = indexes; fRequestMonitors = rms; }
@Override
DataRequestMonitor<?>[] getRequestMonitors() { return fRequestMonitors; }
}
/**
* The background thread of data provider. This thread retrieves the
* requests from the provider's queue and processes them. It also
* initiates random changes in the data set and issues corresponding
* events.
*/
private class ProviderThread extends Thread
{
/**
* Current count of items in the data set. It is changed
* periodically for simulation purposes.
*/
private int fCount = MIN_COUNT;
/**
* Incremented with every data change, it causes the count to be reset
* every four random changes.
*/
private int fCountTrigger = 0;
/** Time when the last change was performed. */
private long fLastChangeTime = System.currentTimeMillis();
/** Random number generator */
private Random fRandom = new java.util.Random();
@Override
public void run() {
try {
// Initialize the count.
randomCount();
// Perform the loop until the shutdown runnable is set.
while(fShutdownRequestMonitor == null) {
// Get the next request from the queue. The time-out
// ensures that that we get to process the random changes.
final Request request = fQueue.poll(RANDOM_CHANGE_MILIS / 10, TimeUnit.MILLISECONDS);
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
Thread.sleep(PROCESSING_TIME);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
// Whatever the results, post it to dispatch thread
// executor (with transmission delay).
fExecutor.schedule(
new DsfRunnable() {
public void run() {
for (RequestMonitor requestMonitor : request.getRequestMonitors()) {
requestMonitor.done();
}
}
},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// Simulate data changes.
randomChanges();
}
}
catch (InterruptedException x) {
DsfExamplesPlugin.getDefault().getLog().log( new Status(
IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, 0, "Interrupted exception in slow data provider thread.", x )); //$NON-NLS-1$
}
// Notify the client that requested shutdown, that shutdown is complete.
fShutdownRequestMonitor.done();
fShutdownRequestMonitor = null;
}
private void processCountRequest(CountRequest request) {
// Calculate the simulated values.
request.fRequestMonitor.setData(fCount);
}
private void processItemRequest(ItemRequest request) {
// Check to make sure that the number of indexes matches the number
// of return tokens.
assert request.fRequestMonitors.length == request.fIndexes.length;
// Calculate the simulated values for each index in request.
for (int i = 0; i < request.fIndexes.length; i++) {
request.fRequestMonitors[i].setData(Long.toHexString(fLastChangeTime) + "." + request.fIndexes[i]); //$NON-NLS-1$
}
}
/**
* This method simulates changes in provider's data set.
*/
private void randomChanges()
{
if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_MILIS) {
fLastChangeTime = System.currentTimeMillis();
// once in every 30 seconds broadcast item count change
if (++fCountTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) randomCount();
else randomDataChange();
}
}
/**
* Calculates new size for provider's data set.
*/
private void randomCount()
{
fCount = MIN_COUNT + Math.abs(fRandom.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Generate the event that the count has changed, and post it to
// dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.countChanged();
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
/**
* Invalidates a random range of indexes.
*/
private void randomDataChange()
{
final Set<Integer> set = new HashSet<Integer>();
// Change one in ten values.
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(fRandom.nextInt()) % fCount) );
}
// Generate the event that the data has changed.
// Post dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.dataChanged(set);
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
public CancellableInputCoalescingSlowDataProvider(DsfExecutor executor) {
fExecutor = executor;
fProviderThread = new ProviderThread();
fProviderThread.start();
}
/**
* Requests shutdown of this data provider.
* @param requestMonitor Monitor to call when shutdown is complete.
*/
public void shutdown(RequestMonitor requestMonitor) {
fShutdownRequestMonitor = requestMonitor;
}
///////////////////////////////////////////////////////////////////////////
// DataProvider
public DsfExecutor getDsfExecutor() {
return fExecutor;
}
public void getItemCount(final DataRequestMonitor<Integer> rm) {
fQueue.add(new CountRequest(rm));
}
public void getItem(final int index, final DataRequestMonitor<String> rm) {
// Schedule a buffer-servicing call, if one is needed.
if (fGetItemIndexesBuffer.isEmpty()) {
fExecutor.schedule(
new Runnable() { public void run() {
fileBufferedRequests();
}},
COALESCING_DELAY_TIME,
TimeUnit.MILLISECONDS);
}
// Add the call data to the buffer.
// Note: it doesn't matter that the items were added to the buffer
// after the buffer-servicing request was scheduled. This is because
// the buffers are guaranteed not to be modified until this dispatch
// cycle is over.
fGetItemIndexesBuffer.add(index);
fGetItemRequestMonitorsBuffer.add(rm);
}
@SuppressWarnings("unchecked")
public void fileBufferedRequests() {
// Check if there is room in the request queue. If not, re-schedule the
// buffer-servicing for later.
if (fQueue.size() >= REQUEST_QUEUE_SIZE_LIMIT) {
if (fGetItemIndexesBuffer.isEmpty()) {
fExecutor.schedule(
new Runnable() { public void run() {
fileBufferedRequests();
}},
REQUEST_BUFFER_FULL_RETRY_DELAY,
TimeUnit.MILLISECONDS);
}
return;
}
// Remove a number of getItem() calls from the buffer, and combine them
// into a request.
List<Integer> indexes = new LinkedList<Integer>();
List<DataRequestMonitor> rms = new LinkedList<DataRequestMonitor>();
int numToCoalesce = 0;
while (!fGetItemIndexesBuffer.isEmpty() && numToCoalesce < COALESCING_COUNT_LIMIT) {
// Get the next call from buffer.
Integer index = fGetItemIndexesBuffer.remove(0);
DataRequestMonitor rm = fGetItemRequestMonitorsBuffer.remove(0);
// If call is already cancelled, ignore it.
if (rm.getStatus().getSeverity() == IStatus.CANCEL) continue;
// Otherwise add it to the request.
indexes.add(index);
rms.add(rm);
numToCoalesce++;
}
// Queue the coalesced request.
fQueue.add( new ItemRequest(
indexes.toArray(new Integer[indexes.size()]),
rms.toArray(new DataRequestMonitor[rms.size()]))
);
// If there are still calls left in the buffer, execute another
// buffer-servicing call, but without any delay.
if (!fGetItemIndexesBuffer.isEmpty()) {
fExecutor.execute(new Runnable() { public void run() {
fileBufferedRequests();
}});
}
}
public void addListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.add(listener);
}
public void removeListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.remove(listener);
}
//
///////////////////////////////////////////////////////////////////////////
}

View file

@ -1,54 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import org.eclipse.dd.dsf.concurrent.DefaultDsfExecutor;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.jface.action.IAction;
import org.eclipse.jface.dialogs.Dialog;
import org.eclipse.ui.IWorkbenchWindow;
import org.eclipse.ui.IWorkbenchWindowActionDelegate;
import org.eclipse.ui.actions.ActionDelegate;
public class CancellableInputCoalescingSlowDataProviderAction extends ActionDelegate
implements IWorkbenchWindowActionDelegate
{
private IWorkbenchWindow fWindow;
@Override
public void run(IAction action) {
if (fWindow != null) {
// Create the standard data provider.
final CancellableInputCoalescingSlowDataProvider dataProvider =
new CancellableInputCoalescingSlowDataProvider(new DefaultDsfExecutor());
// Create the dialog and open it.
Dialog dialog = new SlowDataProviderDialog(
fWindow.getShell(), new CancellingSlowDataProviderContentProvider(), dataProvider);
dialog.open();
// Shut down the data provider thread and the DSF executor thread.
// Note, since data provider is running in background thread, we have to
// wait until this background thread has completed shutdown before
// killing the executor thread itself.
dataProvider.shutdown(new RequestMonitor(dataProvider.getDsfExecutor(), null) {
@Override
public void handleCompleted() {
dataProvider.getDsfExecutor().shutdown();
}
});
}
}
public void init(IWorkbenchWindow window) {
fWindow = window;
}
}

View file

@ -1,292 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
import org.eclipse.jface.viewers.ILazyContentProvider;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Table;
public class CancellingSlowDataProviderContentProvider
implements ILazyContentProvider, DataProvider.Listener
{
TableViewer fTableViewer;
DataProvider fDataProvider;
Display fDisplay;
List<ItemGetDataRequestMonitor> fItemDataRequestMonitors = new LinkedList<ItemGetDataRequestMonitor>();
Set<Integer> fCancelledIdxs = new HashSet<Integer>();
AtomicInteger fCancelCallsPending = new AtomicInteger();
///////////////////////////////////////////////////////////////////////////
// ILazyContentProvider
public void dispose() {
if (fDataProvider != null) {
final DataProvider dataProvider = fDataProvider;
dataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
dataProvider.removeListener(CancellingSlowDataProviderContentProvider.this);
fTableViewer = null;
fDisplay = null;
fDataProvider = null;
}});
} else {
fTableViewer = null;
fDisplay = null;
}
}
public void inputChanged(final Viewer viewer, Object oldInput, final Object newInput) {
// If old data provider is not-null, unregister from it as listener.
if (fDataProvider != null) {
final DataProvider dataProvider = fDataProvider;
dataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
dataProvider.removeListener(CancellingSlowDataProviderContentProvider.this);
}});
}
// Register as listener with new data provider.
// Note: if old data provider and new data provider use different executors,
// there is a chance of a race condition here.
if (newInput != null) {
((DataProvider)newInput).getDsfExecutor().execute(
new Runnable() { public void run() {
if ( ((TableViewer)viewer).getTable().isDisposed() ) return;
fTableViewer = (TableViewer)viewer;
fDisplay = fTableViewer.getTable().getDisplay();
fDataProvider = (DataProvider)newInput;
fDataProvider.addListener(CancellingSlowDataProviderContentProvider.this);
queryItemCount();
}});
}
}
public void updateElement(final int index) {
assert fTableViewer != null;
if (fDataProvider == null) return;
// Calculate the visible index range.
final int topIdx = fTableViewer.getTable().getTopIndex();
final int botIdx = topIdx + getVisibleItemCount(topIdx);
fCancelCallsPending.incrementAndGet();
fDataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
// Must check again, in case disposed while re-dispatching.
if (fDataProvider == null || fTableViewer.getTable().isDisposed()) return;
if (index >= topIdx && index <= botIdx) {
queryItemData(index);
}
cancelStaleRequests(topIdx, botIdx);
}});
}
protected int getVisibleItemCount(int top) {
Table table = fTableViewer.getTable();
int itemCount = table.getItemCount();
return Math.min((table.getBounds().height / table.getItemHeight()) + 2, itemCount - top);
}
///////////////////////////////////////////////////////////////////////////
// DataProvider.Listener
public void countChanged() {
// Check for dispose.
if (fDataProvider == null) return;
// Request new count.
queryItemCount();
}
public void dataChanged(final Set<Integer> indexes) {
// Check for dispose.
if (fDataProvider == null) return;
// Clear changed items in table viewer.
final TableViewer tableViewer = fTableViewer;
fDisplay.asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (fTableViewer == null || fTableViewer.getTable().isDisposed()) return;
for (Integer index : indexes) {
tableViewer.clear(index);
}
}});
}
//
///////////////////////////////////////////////////////////////////////////
/**
* Convenience extension to standard data return runnable. This extension
* automatically checks for errors and asynchronous dispose.
* @param <V>
*/
private abstract class CPGetDataRequestMonitor<V> extends DataRequestMonitor<V> {
public CPGetDataRequestMonitor(DsfExecutor executor) { super(executor, null); }
abstract protected void doRun();
@Override
public void handleCompleted() {
// If there is an error processing request, return.
if (!getStatus().isOK()) return;
// If content provider was disposed, return.
if (fTableViewer == null || fTableViewer.getTable().isDisposed()) return;
// Otherwise execute runnable.
doRun();
}
}
/**
* Executes the item count query with DataProvider. Must be called on
* data provider's dispatch thread.
*/
private void queryItemCount() {
assert fDataProvider.getDsfExecutor().isInExecutorThread();
// Request coumt from data provider. When the count is returned, we
// have to re-dispatch into the display thread to avoid calling
// the table widget on the DSF dispatch thread.
fCancelledIdxs.clear();
fDataProvider.getItemCount(
new CPGetDataRequestMonitor<Integer>(fDataProvider.getDsfExecutor()) {
@Override
protected void doRun() {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
tableViewer.setItemCount(getData());
tableViewer.getTable().clearAll();
}});
}});
}
/**
* Dedicated class for data item requests. This class holds the index
* argument so it can be examined when cancelling stale requests.
*/
// Request data from data provider. Likewise, when the data is
// returned, we have to re-dispatch into the display thread to
// call the table widget.
class ItemGetDataRequestMonitor extends CPGetDataRequestMonitor<String> {
/** Index is used when cancelling stale requests. */
int fIndex;
ItemGetDataRequestMonitor(DsfExecutor executor, int index) {
super(executor);
fIndex = index;
}
// Remove the request from list of outstanding requests. This has
// to be done in run() because doRun() is not always called.
@Override
public void handleCompleted() {
fItemDataRequestMonitors.remove(this);
super.handleCompleted();
}
// Process the result as usual.
@Override
protected void doRun() {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
tableViewer.replace(getData(), fIndex);
}});
}
}
/**
* Executes the data query with DataProvider. Must be called on dispatch
* thread.
* @param index Index of item to fetch.
*/
private void queryItemData(final int index) {
assert fDataProvider.getDsfExecutor().isInExecutorThread();
ItemGetDataRequestMonitor rm = new ItemGetDataRequestMonitor(fDataProvider.getDsfExecutor(), index);
fItemDataRequestMonitors.add(rm);
fDataProvider.getItem(index, rm);
}
/**
* Iterates through the outstanding requests to data provider and
* cancells any that are nto visible any more.
* @param topIdx Top index of the visible items
* @param botIdx Bottom index of the visible items
*/
private void cancelStaleRequests(int topIdx, int botIdx) {
// Go through the outstanding requests and cencel any that
// are not visible anymore.
for (Iterator<ItemGetDataRequestMonitor> itr = fItemDataRequestMonitors.iterator(); itr.hasNext();) {
ItemGetDataRequestMonitor item = itr.next();
if (item.fIndex < topIdx || item.fIndex > botIdx) {
// Set the item to cancelled status, so that the data provider
// will ignore it.
item.setStatus(new Status(IStatus.CANCEL, DsfExamplesPlugin.PLUGIN_ID, 0, "Cancelled", null)); //$NON-NLS-1$
// Add the item index to list of indexes that were cancelled,
// which will be sent to the table widget.
fCancelledIdxs.add(item.fIndex);
// Remove the item from the outstanding cancel requests.
itr.remove();
}
}
int cancelRequestsPending = fCancelCallsPending.decrementAndGet();
if (!fCancelledIdxs.isEmpty() && cancelRequestsPending == 0) {
final Set<Integer> cancelledIdxs = fCancelledIdxs;
fCancelledIdxs = new HashSet<Integer>();
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return;
// Clear the indexes of the cancelled request, so that the
// viewer knows to request them again when needed.
// Note: clearing using TableViewer.clear(int) seems very
// inefficient, it's better to use Table.clear(int[]).
int[] cancelledIdxsArray = new int[cancelledIdxs.size()];
int i = 0;
for (Integer index : cancelledIdxs) {
cancelledIdxsArray[i++] = index;
}
tableViewer.getTable().clear(cancelledIdxsArray);
}});
}
}
}

View file

@ -1,64 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.Set;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
public interface DataProvider {
/**
* Interface for listeners for changes in Provider's data.
*/
public interface Listener {
/**
* Indicates that the count of data items has changed.
*/
void countChanged();
/**
* Indicates that some of the data values have changed.
* @param indexes Indexes of the changed items.
*/
void dataChanged(Set<Integer> indexes);
}
/**
* Returns the DSF executor that has to be used to call this data
* provider.
*/
DsfExecutor getDsfExecutor();
/**
* Retrieves the current item count.
* @param rm Request monitor, to be filled in with the Integer value.
*/
void getItemCount(DataRequestMonitor<Integer> rm);
/**
* Retrieves data value for given index.
* @param index Index of the item to retrieve
* @param rm Return data token, to be filled in with a String value
*/
void getItem(int index, DataRequestMonitor<String> rm);
/**
* Registers given listener with data provider.
*/
void addListener(Listener listener);
/**
* Removes given listener from data provider.
*/
void removeListener(Listener listener);
}

View file

@ -1,408 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.DsfRunnable;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
/**
* Example data provider which has a built-in delay when fetching data. This
* data provider simulates a service which retrieves data from an external
* source such as a networked target, which incurs a considerable delay when
* retrieving data. The data items are simulated values which consist of the
* time when data is being retrieved followed by the item's index.
* <p>
* This version of the data provider features an optimization which causes
* item requests to be grouped together even before they are filed into the
* processing queue. This example demonstrates how the service can implement
* coalescing impelemntation in a situation where the provider has an
* interface which only accepts aggregate requests, so the requests have to be
* coalesed before they are sent to the provider.
*/
public class InputCoalescingSlowDataProvider implements DataProvider {
/** Minimum count of data items */
private final static int MIN_COUNT = 1000;
/** Maximum count of data items */
private final static int MAX_COUNT = 2000;
/** Time interval how often random changes occur. */
private final static int RANDOM_CHANGE_MILIS = 10000;
/** Number of times random changes are made, before count is changed. */
private final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
/** Percentage of values that is changed upon random change (0-100). */
private final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
/**
* Amount of time (in miliseconds) how long the requests to provider, and
* events from provider are delayed by.
*/
private final static int TRANSMISSION_DELAY_TIME = 500;
/**
* Amount of time (in milliseconds) how long the provider takes to process
* a request.
*/
private final static int PROCESSING_TIME = 100;
/**
* Maximum number of item requests that can be coalesced into a single
* request.
*/
private final static int COALESCING_COUNT_LIMIT = 10;
/**
* Delay in processing the buffer of getItem() calls. This delay helps
* to ensure that a meaningful number of items is present in the buffer
* before the buffer data is coalesced into a request.
*/
private final static int COALESCING_DELAY_TIME = 10;
/** Dispatch-thread executor that this provider uses. */
private DsfExecutor fExecutor;
/** List of listeners registered for events from provider. */
private List<Listener> fListeners = new LinkedList<Listener>();
/** Thread that handles data requests. */
private ProviderThread fProviderThread;
/** Queue of currently pending data requests. */
private final BlockingQueue<Request> fQueue = new DelayQueue<Request>();
/**
* Runnable to be submitted when the data provider thread is shut down.
* This variable acts like a flag: when client want to shut down the
* provider, it sets this runnable, and when the backgroun thread sees
* that it's set, it shuts itself down, and posts this runnable with
* the executor.
*/
private RequestMonitor fShutdownRequestMonitor = null;
/**
* Buffers for coalescing getItem() calls into a single request.
*/
private List<Integer> fGetItemIndexesBuffer = new LinkedList<Integer>();
private List<DataRequestMonitor<String>> fGetItemRequestMonitorsBuffer = new LinkedList<DataRequestMonitor<String>>();
/**
* Base class for requests that are queued by the data provider. It
* implements java.util.concurrent.Delayed to allow for use of DelayedQueue.
* Every request into the queue is delayed by the simulated transmission
* time.
*/
private static abstract class Request implements Delayed {
/** Sequence counter and number are used to ensure FIFO order **/
private static int fSequenceCounter = 0;
private int fSequenceNumber = fSequenceCounter++;
/** Time delay tracks how items will be delayed. **/
private long fTime = System.currentTimeMillis() + TRANSMISSION_DELAY_TIME;
// @see java.util.concurrent.Delayed
public long getDelay(TimeUnit unit) {
return unit.convert(fTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// @see java.lang.Comparable
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
Request x = (Request)other;
long diff = fTime - x.fTime;
if (diff < 0) return -1;
else if (diff > 0) return 1;
else if (fSequenceNumber < x.fSequenceNumber) return -1;
else return 1;
}
/** All requests have an associated array of RequestMonitor tokens **/
abstract RequestMonitor[] getRequestMonitors();
}
/**
* Object used to encapsulate the "getItemCount" requests. Instances of it
* are queued till processed.
*/
private static class CountRequest extends Request
{
DataRequestMonitor<Integer> fRequestMonitors;
CountRequest(DataRequestMonitor<Integer> rms) { fRequestMonitors = rms; }
@Override
DataRequestMonitor<?>[] getRequestMonitors() { return new DataRequestMonitor[] { fRequestMonitors }; }
}
/**
* Object used to encapsulate the "getItem" requests. Instances of it
* are queued till processed.
*/
private static class ItemRequest extends Request
{
DataRequestMonitor<String>[] fRequestMonitors;
Integer[] fIndexes;
ItemRequest(Integer[] indexes, DataRequestMonitor<String>[] rms) { fIndexes = indexes; fRequestMonitors = rms; }
@Override
DataRequestMonitor<?>[] getRequestMonitors() { return fRequestMonitors; }
}
/**
* The background thread of data provider. This thread retrieves the
* requests from the provider's queue and processes them. It also
* initiates random changes in the data set and issues corresponding
* events.
*/
private class ProviderThread extends Thread
{
/**
* Current count of items in the data set. It is changed
* periodically for simulation purposes.
*/
private int fCount = MIN_COUNT;
/**
* Incremented with every data change, it causes the count to be reset
* every four random changes.
*/
private int fCountTrigger = 0;
/** Time when the last change was performed. */
private long fLastChangeTime = System.currentTimeMillis();
/** Random number generator */
private Random fRandom = new java.util.Random();
@Override
public void run() {
try {
// Initialize the count.
randomCount();
// Perform the loop until the shutdown runnable is set.
while(fShutdownRequestMonitor == null) {
// Get the next request from the queue. The time-out
// ensures that that we get to process the random changes.
final Request request = fQueue.poll(RANDOM_CHANGE_MILIS / 10, TimeUnit.MILLISECONDS);
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
Thread.sleep(PROCESSING_TIME);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
// Whatever the results, post it to dispatch thread
// executor (with transmission delay).
fExecutor.schedule(
new DsfRunnable() {
public void run() {
for (RequestMonitor requestMonitor : request.getRequestMonitors()) {
requestMonitor.done();
}
}
},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// Simulate data changes.
randomChanges();
}
}
catch (InterruptedException x) {
DsfExamplesPlugin.getDefault().getLog().log( new Status(
IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, 0, "Interrupted exception in slow data provider thread.", x )); //$NON-NLS-1$
}
// Notify the client that requested shutdown, that shutdown is complete.
fShutdownRequestMonitor.done();
fShutdownRequestMonitor = null;
}
private void processCountRequest(CountRequest request) {
// Calculate the simulated values.
request.fRequestMonitors.setData(fCount);
}
private void processItemRequest(ItemRequest request) {
// Check to make sure that the number of indexes matches the number
// of return tokens.
assert request.fRequestMonitors.length == request.fIndexes.length;
// Calculate the simulated values for each index in request.
for (int i = 0; i < request.fIndexes.length; i++) {
request.fRequestMonitors[i].setData(Long.toHexString(fLastChangeTime) + "." + request.fIndexes[i]); //$NON-NLS-1$
}
}
/**
* This method simulates changes in provider's data set.
*/
private void randomChanges()
{
if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_MILIS) {
fLastChangeTime = System.currentTimeMillis();
// once in every 30 seconds broadcast item count change
if (++fCountTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) randomCount();
else randomDataChange();
}
}
/**
* Calculates new size for provider's data set.
*/
private void randomCount()
{
fCount = MIN_COUNT + Math.abs(fRandom.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Generate the event that the count has changed, and post it to
// dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.countChanged();
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
/**
* Invalidates a random range of indexes.
*/
private void randomDataChange()
{
final Set<Integer> set = new HashSet<Integer>();
// Change one in ten values.
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(fRandom.nextInt()) % fCount) );
}
// Generate the event that the data has changed.
// Post dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.dataChanged(set);
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
public InputCoalescingSlowDataProvider(DsfExecutor executor) {
fExecutor = executor;
fProviderThread = new ProviderThread();
fProviderThread.start();
}
/**
* Requests shutdown of this data provider.
* @param requestMonitor Monitor to call when shutdown is complete.
*/
public void shutdown(RequestMonitor requestMonitor) {
fShutdownRequestMonitor = requestMonitor;
}
///////////////////////////////////////////////////////////////////////////
// DataProvider
public DsfExecutor getDsfExecutor() {
return fExecutor;
}
public void getItemCount(final DataRequestMonitor<Integer> rm) {
fExecutor.schedule(
new Runnable() { public void run() {
fQueue.add(new CountRequest(rm));
}},
TRANSMISSION_DELAY_TIME,
TimeUnit.MILLISECONDS);
}
public void getItem(final int index, final DataRequestMonitor<String> rm) {
// Schedule a buffer-servicing call, if one is needed.
if (fGetItemIndexesBuffer.isEmpty()) {
fExecutor.schedule(
new Runnable() { public void run() {
fileBufferedRequests();
}},
COALESCING_DELAY_TIME,
TimeUnit.MILLISECONDS);
}
// Add the call data to the buffer.
// Note: it doesn't matter that the items were added to the buffer
// after the buffer-servicing request was scheduled. This is because
// the buffers are guaranteed not to be modified until this dispatch
// cycle is over.
fGetItemIndexesBuffer.add(index);
fGetItemRequestMonitorsBuffer.add(rm);
}
@SuppressWarnings("unchecked")
public void fileBufferedRequests() {
// Remove a number of getItem() calls from the buffer, and combine them
// into a request.
int numToCoalesce = Math.min(fGetItemIndexesBuffer.size(), COALESCING_COUNT_LIMIT);
final ItemRequest request = new ItemRequest(new Integer[numToCoalesce], new DataRequestMonitor[numToCoalesce]);
for (int i = 0; i < numToCoalesce; i++) {
request.fIndexes[i] = fGetItemIndexesBuffer.remove(0);
request.fRequestMonitors[i] = fGetItemRequestMonitorsBuffer.remove(0);
}
// Queue the coalesced request, with the appropriate transmission delay.
fQueue.add(request);
// If there are still calls left in the buffer, execute another
// buffer-servicing call, but without any delay.
if (!fGetItemIndexesBuffer.isEmpty()) {
fExecutor.execute(new Runnable() { public void run() {
fileBufferedRequests();
}});
}
}
public void addListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.add(listener);
}
public void removeListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.remove(listener);
}
//
///////////////////////////////////////////////////////////////////////////
}

View file

@ -1,54 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import org.eclipse.dd.dsf.concurrent.DefaultDsfExecutor;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.jface.action.IAction;
import org.eclipse.jface.dialogs.Dialog;
import org.eclipse.ui.IWorkbenchWindow;
import org.eclipse.ui.IWorkbenchWindowActionDelegate;
import org.eclipse.ui.actions.ActionDelegate;
public class InputCoalescingSlowDataProviderAction extends ActionDelegate
implements IWorkbenchWindowActionDelegate
{
private IWorkbenchWindow fWindow;
@Override
public void run(IAction action) {
if (fWindow != null) {
// Create the standard data provider.
final InputCoalescingSlowDataProvider dataProvider =
new InputCoalescingSlowDataProvider(new DefaultDsfExecutor());
// Create the dialog and open it.
Dialog dialog = new SlowDataProviderDialog(
fWindow.getShell(), new SlowDataProviderContentProvider(), dataProvider);
dialog.open();
// Shut down the data provider thread and the DSF executor thread.
// Note, since data provider is running in background thread, we have to
// wait until this background thread has completed shutdown before
// killing the executor thread itself.
dataProvider.shutdown(new RequestMonitor(dataProvider.getDsfExecutor(), null) {
@Override
public void handleCompleted() {
dataProvider.getDsfExecutor().shutdown();
}
});
}
}
public void init(IWorkbenchWindow window) {
fWindow = window;
}
}

View file

@ -1,330 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.DsfRunnable;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
/**
* Example data provider which has a built-in delay when fetching data. This
* data provider simulates a service which retrieves data from an external
* source such as a networked target, which incurs a considerable delay when
* retrieving data. The data items are simulated values which consist of the
* time when data is being retrieved followed by the item's index.
*/
public class SlowDataProvider implements DataProvider {
/** Minimum count of data items */
private final static int MIN_COUNT = 1000;
/** Maximum count of data items */
private final static int MAX_COUNT = 2000;
/** Time interval how often random changes occur. */
private final static int RANDOM_CHANGE_MILIS = 10000;
/** Number of times random changes are made, before count is changed. */
private final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
/** Percentage of values that is changed upon random change (0-100). */
private final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
/**
* Amount of time (in miliseconds) how long the requests to provider, and
* events from provider are delayed by.
*/
private final static int TRANSMISSION_DELAY_TIME = 500;
/**
* Amount of time (in milliseconds) how long the provider takes to process
* a request.
*/
private final static int PROCESSING_TIME = 100;
/** Dispatch-thread executor that this provider uses. */
private DsfExecutor fExecutor;
/** List of listeners registered for events from provider. */
private List<Listener> fListeners = new LinkedList<Listener>();
/** Thread that handles data requests. */
private ProviderThread fProviderThread;
/** Queue of currently pending data requests. */
private final BlockingQueue<Request> fQueue = new DelayQueue<Request>();
/**
* Runnable to be submitted when the data provider thread is shut down.
* This variable acts like a flag: when client want to shut down the
* provider, it sets this runnable, and when the backgroun thread sees
* that it's set, it shuts itself down, and posts this runnable with
* the executor.
*/
private RequestMonitor fShutdownRequestMonitor = null;
/**
* Base class for requests that are queued by the data provider. It
* implements java.util.concurrent.Delayed to allow for use of DelayedQueue.
* Every request into the queue is delayed by the simulated transmission
* time.
*/
private static abstract class Request implements Delayed {
/** Sequence counter and number are used to ensure FIFO order **/
private static int fSequenceCounter = 0;
private int fSequenceNumber = fSequenceCounter++;
/** Time delay tracks how items will be delayed. **/
private long fTime = System.currentTimeMillis() + TRANSMISSION_DELAY_TIME;
// @see java.util.concurrent.Delayed
public long getDelay(TimeUnit unit) {
return unit.convert(fTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// @see java.lang.Comparable
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
Request x = (Request)other;
long diff = fTime - x.fTime;
if (diff < 0) return -1;
else if (diff > 0) return 1;
else if (fSequenceNumber < x.fSequenceNumber) return -1;
else return 1;
}
/** All requests have an associated RequestMonitor token **/
abstract RequestMonitor getRequestMonitor();
}
/**
* Object used to encapsulate the "getItemCount" requests. Instances of it
* are queued till processed.
*/
private static class CountRequest extends Request
{
DataRequestMonitor<Integer> fRequestMonitor;
CountRequest(DataRequestMonitor<Integer> rm) { fRequestMonitor = rm; }
@Override
DataRequestMonitor<?> getRequestMonitor() { return fRequestMonitor; }
}
/**
* Object used to encapsulate the "getItem" requests. Instances of it
* are queued till processed.
*/
private static class ItemRequest extends Request
{
DataRequestMonitor<String> fRequestMonitor;
int fIndex;
ItemRequest(int index, DataRequestMonitor<String> rm) { fIndex = index; fRequestMonitor = rm; }
@Override
DataRequestMonitor<?> getRequestMonitor() { return fRequestMonitor; }
}
/**
* The background thread of data provider. This thread retrieves the
* requests from the provider's queue and processes them. It also
* initiates random changes in the data set and issues corresponding
* events.
*/
private class ProviderThread extends Thread
{
/**
* Current count of items in the data set. It is changed
* periodically for simulation purposes.
*/
private int fCount = MIN_COUNT;
/**
* Incremented with every data change, it causes the count to be reset
* every four random changes.
*/
private int fCountTrigger = 0;
/** Time when the last change was performed. */
private long fLastChangeTime = System.currentTimeMillis();
/** Random number generator */
private Random fRandom = new java.util.Random();
@Override
public void run() {
try {
// Initialize the count.
randomCount();
// Perform the loop until the shutdown runnable is set.
while(fShutdownRequestMonitor == null) {
// Get the next request from the queue. The time-out
// ensures that that we get to process the random changes.
final Request request = fQueue.poll(RANDOM_CHANGE_MILIS / 10, TimeUnit.MILLISECONDS);
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
Thread.sleep(PROCESSING_TIME);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
// Whatever the result, post it to dispatch thread
// executor (with transmission delay).
fExecutor.schedule(
new DsfRunnable() {
public void run() {
request.getRequestMonitor().done();
}
},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// Simulate data changes.
randomChanges();
}
}
catch (InterruptedException x) {
DsfExamplesPlugin.getDefault().getLog().log( new Status(
IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, 0, "Interrupted exception in slow data provider thread.", x )); //$NON-NLS-1$
}
// Notify the client that requested shutdown, that shutdown is complete.
fShutdownRequestMonitor.done();
fShutdownRequestMonitor = null;
}
private void processCountRequest(CountRequest request) {
// Calculate the simulated values.
request.fRequestMonitor.setData(fCount);
}
private void processItemRequest(ItemRequest request) {
// Calculate the simulated values.
request.fRequestMonitor.setData(Long.toHexString(fLastChangeTime) + "." + request.fIndex); //$NON-NLS-1$
}
/**
* This method simulates changes in provider's data set.
*/
private void randomChanges()
{
if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_MILIS) {
fLastChangeTime = System.currentTimeMillis();
// once in every 30 seconds broadcast item count change
if (++fCountTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) randomCount();
else randomDataChange();
}
}
/**
* Calculates new size for provider's data set.
*/
private void randomCount()
{
fCount = MIN_COUNT + Math.abs(fRandom.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Generate the event that the count has changed, and post it to
// dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.countChanged();
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
/**
* Invalidates a random range of indexes.
*/
private void randomDataChange()
{
final Set<Integer> set = new HashSet<Integer>();
// Change one in ten values.
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(fRandom.nextInt()) % fCount) );
}
// Generate the event that the data has changed.
// Post dispatch thread with transmission delay.
fExecutor.schedule(
new Runnable() { public void run() {
for (Listener listener : fListeners) {
listener.dataChanged(set);
}
}},
TRANSMISSION_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
public SlowDataProvider(DsfExecutor executor) {
fExecutor = executor;
fProviderThread = new ProviderThread();
fProviderThread.start();
}
/**
* Requests shutdown of this data provider.
* @param requestMonitor Request completion monitor.
*/
public void shutdown(RequestMonitor requestMonitor) {
fShutdownRequestMonitor = requestMonitor;
}
///////////////////////////////////////////////////////////////////////////
// DataProvider
public DsfExecutor getDsfExecutor() {
return fExecutor;
}
public void getItemCount(final DataRequestMonitor<Integer> rm) {
fQueue.add(new CountRequest(rm));
}
public void getItem(final int index, final DataRequestMonitor<String> rm) {
fQueue.add(new ItemRequest(index, rm));
}
public void addListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.add(listener);
}
public void removeListener(Listener listener) {
assert fExecutor.isInExecutorThread();
fListeners.remove(listener);
}
//
///////////////////////////////////////////////////////////////////////////
}

View file

@ -1,53 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import org.eclipse.dd.dsf.concurrent.DefaultDsfExecutor;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.jface.action.IAction;
import org.eclipse.jface.dialogs.Dialog;
import org.eclipse.ui.IWorkbenchWindow;
import org.eclipse.ui.IWorkbenchWindowActionDelegate;
import org.eclipse.ui.actions.ActionDelegate;
public class SlowDataProviderAction extends ActionDelegate
implements IWorkbenchWindowActionDelegate
{
private IWorkbenchWindow fWindow;
@Override
public void run(IAction action) {
if (fWindow != null) {
// Create the standard data provider.
final SlowDataProvider dataProvider = new SlowDataProvider(new DefaultDsfExecutor());
// Create the dialog and open it.
Dialog dialog = new SlowDataProviderDialog(
fWindow.getShell(), new SlowDataProviderContentProvider(), dataProvider);
dialog.open();
// Shut down the data provider thread and the DSF executor thread.
// Note, since data provider is running in background thread, we have to
// wait until this background thread has completed shutdown before
// killing the executor thread itself.
dataProvider.shutdown(new RequestMonitor(dataProvider.getDsfExecutor(), null) {
@Override
public void handleCompleted() {
dataProvider.getDsfExecutor().shutdown();
}
});
}
}
public void init(IWorkbenchWindow window) {
fWindow = window;
}
}

View file

@ -1,190 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import java.util.Set;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.jface.viewers.ILazyContentProvider;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
public class SlowDataProviderContentProvider
implements ILazyContentProvider, DataProvider.Listener
{
TableViewer fTableViewer;
DataProvider fDataProvider;
///////////////////////////////////////////////////////////////////////////
// ILazyContentProvider
public void dispose() {
if (fDataProvider != null) {
final DataProvider dataProvider = fDataProvider;
dataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
dataProvider.removeListener(SlowDataProviderContentProvider.this);
fTableViewer = null;
fDataProvider = null;
}});
} else {
fTableViewer = null;
}
}
public void inputChanged(final Viewer viewer, Object oldInput, final Object newInput) {
// If old data provider is not-null, unregister from it as listener.
if (fDataProvider != null) {
final DataProvider dataProvider = fDataProvider;
dataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
dataProvider.removeListener(SlowDataProviderContentProvider.this);
}});
}
// Register as listener with new data provider.
// Note: if old data provider and new data provider use different executors,
// there is a chance of a race condition here.
if (newInput != null) {
((DataProvider)newInput).getDsfExecutor().execute(
new Runnable() { public void run() {
fTableViewer = (TableViewer)viewer;
fDataProvider = (DataProvider)newInput;
fDataProvider.addListener(SlowDataProviderContentProvider.this);
queryItemCount();
}});
}
}
public void updateElement(final int index) {
assert fTableViewer != null;
if (fDataProvider == null) return;
fDataProvider.getDsfExecutor().execute(
new Runnable() { public void run() {
// Must check again, in case disposed while re-dispatching.
if (fDataProvider == null) return;
queryItemData(index);
}});
}
///////////////////////////////////////////////////////////////////////////
// DataProvider.Listener
public void countChanged() {
// Check for dispose.
if (fDataProvider == null) return;
// Request new count.
queryItemCount();
}
public void dataChanged(final Set<Integer> indexes) {
// Check for dispose.
if (fDataProvider == null) return;
// Clear changed items in table viewer.
if (fTableViewer != null) {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
for (Integer index : indexes) {
tableViewer.clear(index);
}
}});
}
}
//
///////////////////////////////////////////////////////////////////////////
/**
* Convenience extension to standard data return runnable. This extension
* automatically checks for errors and asynchronous dipose.
* @param <V>
*/
private abstract class CPGetDataRequestMonitor<V> extends DataRequestMonitor<V> {
CPGetDataRequestMonitor(DsfExecutor executor) { super(executor, null); }
abstract protected void doRun();
@Override
final public void handleCompleted() {
// If there is an error processing request, return.
if (!getStatus().isOK()) return;
// If content provider was disposed, return.
if (fTableViewer == null) return;
// Otherwise execute runnable.
doRun();
}
}
/**
* Executes the item count query with DataProvider. Must be called on
* data provider's dispatch thread.
*/
private void queryItemCount() {
assert fDataProvider.getDsfExecutor().isInExecutorThread();
// Request coumt from data provider. When the count is returned, we
// have to re-dispatch into the display thread to avoid calling
// the table widget on the DSF dispatch thread.
fDataProvider.getItemCount(
new CPGetDataRequestMonitor<Integer>(fDataProvider.getDsfExecutor()) {
@Override
protected void doRun() {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
tableViewer.setItemCount(getData());
tableViewer.getTable().clearAll();
}});
}});
}
/**
* Executes the data query with DataProvider. Must be called on dispatch
* thread.
* @param index Index of item to fetch.
*/
private void queryItemData(final int index) {
assert fDataProvider.getDsfExecutor().isInExecutorThread();
// Request data from data provider. Likewise, when the data is
// returned, we have to re-dispatch into the display thread to
// call the table widget.
fDataProvider.getItem(
index,
new CPGetDataRequestMonitor<String>(fDataProvider.getDsfExecutor()) {
@Override
protected void doRun() {
final TableViewer tableViewer = fTableViewer;
tableViewer.getTable().getDisplay().asyncExec(
new Runnable() { public void run() {
// Check again if table wasn't disposed when
// switching to the display thread.
if (tableViewer.getTable().isDisposed()) return; // disposed
tableViewer.replace(getData(), index);
}});
}});
}
}

View file

@ -1,50 +0,0 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.concurrent;
import org.eclipse.jface.dialogs.Dialog;
import org.eclipse.jface.viewers.IContentProvider;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.swt.SWT;
import org.eclipse.swt.layout.GridData;
import org.eclipse.swt.widgets.Composite;
import org.eclipse.swt.widgets.Control;
import org.eclipse.swt.widgets.Shell;
/**
* Dialog shared by all slow data provider examples. It accepts the data
* provider and the content provider as arguments to the constructor. So the
* only thing that the dialog does is to create the table viewer and
* initialize it with the providers.
*/
public class SlowDataProviderDialog extends Dialog {
private TableViewer fDataViewer;
private DataProvider fDataProvider;
private IContentProvider fContentProvider;
public SlowDataProviderDialog(Shell parent, IContentProvider contentProvider, DataProvider dataProvider) {
super(parent);
setShellStyle(getShellStyle() | SWT.RESIZE);
fContentProvider = contentProvider;
fDataProvider = dataProvider;
}
@Override
protected Control createDialogArea(Composite parent) {
Composite area = (Composite) super.createDialogArea(parent);
fDataViewer = new TableViewer(area, SWT.VIRTUAL);
fDataViewer.getTable().setLayoutData(new GridData(GridData.FILL_BOTH));
fDataViewer.setContentProvider(fContentProvider);
fDataViewer.setInput(fDataProvider);
return area;
}
}

View file

@ -0,0 +1,266 @@
/*******************************************************************************
* Copyright (c) 2006, 2008 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.dataviewer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.ImmediateExecutor;
import org.eclipse.dd.dsf.concurrent.Query;
import org.eclipse.dd.dsf.ui.concurrent.DisplayDsfExecutor;
import org.eclipse.jface.viewers.ILazyContentProvider;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.swt.SWT;
import org.eclipse.swt.graphics.Font;
import org.eclipse.swt.layout.GridData;
import org.eclipse.swt.layout.GridLayout;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.Table;
/**
* Data viewer based on a table, which reads data using asynchronous methods.
* <p>
* This viewer implements the {@link ILazyContentProvider} interface
* which is used by the JFace TableViewer class to populate a Table. This
* interface contains separate asynchronous methods for requesting the count
* and values for individual indexes, which neatly correspond to the methods
* in {@link IDataGenerator}. As an added optimization, this viewer
* implementation checks for the range of visible items in the view upon each
* request, and it cancels old requests which scroll out of view but have not
* been completed yet. However, it is up to the data generator implementation
* to check the canceled state of the requests and ignore them.
* </p>
*/
public class AsyncDataViewer
implements ILazyContentProvider, IDataGenerator.Listener
{
// Executor to use instead of Display.asyncExec().
final private DsfExecutor fDisplayExecutor;
// The viewer and generator that this content provider using.
final private TableViewer fViewer;
final private IDataGenerator fDataGenerator;
// Fields used in request cancellation logic.
private List<ValueDataRequestMonitor> fItemDataRequestMonitors = new LinkedList<ValueDataRequestMonitor>();
private Set<Integer> fIndexesToCancel = new HashSet<Integer>();
private int fCancelCallsPending = 0;
public AsyncDataViewer(TableViewer viewer, IDataGenerator generator) {
fViewer = viewer;
fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(fViewer.getTable().getDisplay());
fDataGenerator = generator;
fDataGenerator.addListener(this);
}
public void dispose() {
fDataGenerator.removeListener(this);
}
public void inputChanged(Viewer viewer, Object oldInput, Object newInput) {
// Set the initial count to the viewer after the input is set.
queryItemCount();
}
public void updateElement(final int index) {
// Calculate the visible index range.
final int topIdx = fViewer.getTable().getTopIndex();
final int botIdx = topIdx + getVisibleItemCount(topIdx);
// Request the item for the given index.
queryValue(index);
// Invoke a cancel task with a delay. The delay allows multiple cancel
// calls to be combined together improving performance of the viewer.
fCancelCallsPending++;
fDisplayExecutor.schedule(
new Runnable() { public void run() {
cancelStaleRequests(topIdx, botIdx);
}},
1, TimeUnit.MILLISECONDS);
}
private int getVisibleItemCount(int top) {
Table table = fViewer.getTable();
int itemCount = table.getItemCount();
return Math.min((table.getBounds().height / table.getItemHeight()) + 2, itemCount - top);
}
public void countChanged() {
queryItemCount();
}
public void valuesChanged(final Set<Integer> indexes) {
// Mark the changed items in table viewer as dirty, this will
// trigger update requests for these indexes if they are
// visible in the viewer.
final TableViewer tableViewer = fViewer;
fDisplayExecutor.execute( new Runnable() {
public void run() {
if (!fViewer.getTable().isDisposed()) {
for (Integer index : indexes) {
tableViewer.clear(index);
}
}
}});
}
private void queryItemCount() {
// Request count from data provider. When the count is returned, we
// have to re-dispatch into the display thread to avoid calling
// the table widget on the DSF dispatch thread.
fIndexesToCancel.clear();
fDataGenerator.getCount(
// Use the display executor to construct the request monitor, this
// will cause the handleCompleted() method to be automatically
// called on the display thread.
new DataRequestMonitor<Integer>(fDisplayExecutor, null) {
@Override
protected void handleCompleted() {
if (!fViewer.getTable().isDisposed()) {
fViewer.setItemCount(getData());
fViewer.getTable().clearAll();
}
}
});
}
// Dedicated class for data item requests. This class holds the index
// argument so it can be examined when canceling stale requests.
private class ValueDataRequestMonitor extends DataRequestMonitor<String> {
/** Index is used when canceling stale requests. */
int fIndex;
ValueDataRequestMonitor(int index) {
super(fDisplayExecutor, null);
fIndex = index;
}
@Override
protected void handleCompleted() {
fItemDataRequestMonitors.remove(this);
// Check if the request completed successfully, otherwise ignore it.
if (getStatus().isOK()) {
if (!fViewer.getTable().isDisposed()) {
fViewer.replace(getData(), fIndex);
}
}
}
}
private void queryValue(final int index) {
ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index);
fItemDataRequestMonitors.add(rm);
fDataGenerator.getValue(index, rm);
}
private void cancelStaleRequests(int topIdx, int botIdx) {
// Decrement the count of outstanding cancel calls.
fCancelCallsPending--;
// Must check again, in case disposed while re-dispatching.
if (fDataGenerator == null || fViewer.getTable().isDisposed()) return;
// Go through the outstanding requests and cancel any that
// are not visible anymore.
for (Iterator<ValueDataRequestMonitor> itr = fItemDataRequestMonitors.iterator(); itr.hasNext();) {
ValueDataRequestMonitor item = itr.next();
if (item.fIndex < topIdx || item.fIndex > botIdx) {
// Set the item to canceled status, so that the data provider
// will ignore it.
item.setCanceled(true);
// Add the item index to list of indexes that were canceled,
// which will be sent to the table widget.
fIndexesToCancel.add(item.fIndex);
// Remove the item from the outstanding cancel requests.
itr.remove();
}
}
if (!fIndexesToCancel.isEmpty() && fCancelCallsPending == 0) {
Set<Integer> canceledIdxs = fIndexesToCancel;
fIndexesToCancel = new HashSet<Integer>();
// Clear the indexes of the canceled request, so that the
// viewer knows to request them again when needed.
// Note: clearing using TableViewer.clear(int) seems very
// inefficient, it's better to use Table.clear(int[]).
int[] canceledIdxsArray = new int[canceledIdxs.size()];
int i = 0;
for (Integer index : canceledIdxs) {
canceledIdxsArray[i++] = index;
}
fViewer.getTable().clear(canceledIdxsArray);
}
}
public static void main(String[] args) {
// Create the shell to hold the viewer.
Display display = new Display();
Shell shell = new Shell(display, SWT.SHELL_TRIM);
shell.setLayout(new GridLayout());
GridData data = new GridData(GridData.FILL_BOTH);
shell.setLayoutData(data);
Font font = new Font(display, "Courier", 10, SWT.NORMAL);
// Create the table viewer.
TableViewer tableViewer = new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL);
tableViewer.getControl().setLayoutData(data);
// Create the data generator.
final IDataGenerator generator = new DataGeneratorWithExecutor();
// Create the content provider which will populate the viewer.
AsyncDataViewer contentProvider = new AsyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());
// Open the shell and service the display dispatch loop until user
// closes the shell.
shell.open();
while (!shell.isDisposed()) {
if (!display.readAndDispatch())
display.sleep();
}
// The IDataGenerator.shutdown() method is asynchronous, this requires
// using a query again in order to wait for its completion.
Query<Object> shutdownQuery = new Query<Object>() {
@Override
protected void execute(DataRequestMonitor<Object> rm) {
generator.shutdown(rm);
}
};
ImmediateExecutor.getInstance().execute(shutdownQuery);
try {
shutdownQuery.get();
} catch (Exception e) {}
// Shut down the display.
font.dispose();
display.dispose();
}
}

View file

@ -0,0 +1,404 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
//#ifdef excercises
package org.eclipse.dd.examples.dsf.dataviewer;
//#else
//#package org.eclipse.dd.examples.dsf.dataviewer.answers;
//#endif
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.ConfinedToDsfExecutor;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.DefaultDsfExecutor;
import org.eclipse.dd.dsf.concurrent.DsfExecutor;
import org.eclipse.dd.dsf.concurrent.DsfRunnable;
import org.eclipse.dd.dsf.concurrent.Immutable;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.dsf.concurrent.ThreadSafe;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
/**
* DSF Executor-based implementation of the data generator.
* <p>
* This generator uses a queue of client requests and processes these
* requests periodically using a DSF executor. The main feature of this
* generator is that it uses the executor as its only synchronization object.
* This means that all the fields with the exception of the executor can only
* be accessed while running in the executor thread.
* </p>
*/
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//#@ThreadSafe
//#endif
public class DataGeneratorWithExecutor implements IDataGenerator {
// Request objects are used to serialize the interface calls into objects
// which can then be pushed into a queue.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @Immutable
//#endif
abstract class Request {
final RequestMonitor fRequestMonitor;
Request(RequestMonitor rm) {
fRequestMonitor = rm;
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @Immutable
//#endif
class CountRequest extends Request {
CountRequest(DataRequestMonitor<Integer> rm) {
super(rm);
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @Immutable
//#endif
class ItemRequest extends Request {
final int fIndex;
ItemRequest(int index, DataRequestMonitor<String> rm) {
super(rm);
fIndex = index;
}
}
// The executor used to access all internal data of the generator.
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
private DsfExecutor fExecutor;
// Main request queue of the data generator. The getValue(), getCount(),
// and shutdown() methods write into the queue, while the serviceQueue()
// method reads from it.
// The executor used to access all internal data of the generator.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private List<Request> fQueue = new LinkedList<Request>();
// List of listeners is not synchronized, it also has to be accessed
// using the executor.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private List<Listener> fListeners = new LinkedList<Listener>();
// Current number of elements in this generator.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private int fCount = MIN_COUNT;
// Counter used to determine when to reset the element count.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private int fCountResetTrigger = 0;
// Elements which were modified since the last reset.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private Set<Integer> fChangedIndexes = new HashSet<Integer>();
// Flag used to ensure that requests are processed sequentially.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private boolean fServiceQueueInProgress = false;
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public DataGeneratorWithExecutor() {
// Create the executor
fExecutor = new DefaultDsfExecutor("Supplier Executor");
// Schedule a runnable to make the random changes.
fExecutor.scheduleAtFixedRate(
new DsfRunnable() {
public void run() {
randomChanges();
}
},
RANDOM_CHANGE_INTERVAL,
RANDOM_CHANGE_INTERVAL,
TimeUnit.MILLISECONDS);
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public void shutdown(final RequestMonitor rm) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
// Empty the queue of requests and fail them.
for (Request request : fQueue) {
request.fRequestMonitor.setStatus(
new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
request.fRequestMonitor.done();
}
fQueue.clear();
// Kill executor.
fExecutor.shutdown();
rm.done();
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public void getCount(final DataRequestMonitor<Integer> rm) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
fQueue.add(new CountRequest(rm));
serviceQueue();
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public void getValue(final int index, final DataRequestMonitor<String> rm) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
fQueue.add(new ItemRequest(index, rm));
serviceQueue();
}
});
} catch (RejectedExecutionException e) {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public void addListener(final Listener listener) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
fListeners.add(listener);
}
});
} catch (RejectedExecutionException e) {}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotation indicating allowed concurrency access
//#endif
public void removeListener(final Listener listener) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
fListeners.remove(listener);
}
});
} catch (RejectedExecutionException e) {}
}
// Main processing function of this generator.
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void serviceQueue() {
//#ifdef excercises
// TODO Excercise 4 - Add logic to discard requests from queue.
//#else
//# for (Iterator<Request> requestItr = fQueue.iterator(); requestItr.hasNext();) {
//# Request request = requestItr.next();
//# if (request.fRequestMonitor.isCanceled()) {
//# request.fRequestMonitor.setStatus(
//# new Status(IStatus.CANCEL, DsfExamplesPlugin.PLUGIN_ID, "Request canceled"));
//# request.fRequestMonitor.done();
//# requestItr.remove();
//# }
//# }
//#endif
// If a queue servicing is already scheduled, do nothing.
if (fServiceQueueInProgress) {
return;
}
if (fQueue.size() != 0) {
// If there are requests to service, remove one from the queue and
// schedule a runnable to process the request after a processing
// delay.
fServiceQueueInProgress = true;
final Request request = fQueue.remove(0);
fExecutor.schedule(
new DsfRunnable() {
public void run() {
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
}
// Reset the processing flag and process next
// request.
fServiceQueueInProgress = false;
serviceQueue();
}
},
PROCESSING_DELAY, TimeUnit.MILLISECONDS);
}
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
}
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
if (fChangedIndexes.contains(request.fIndex)) {
rm.setData("Changed: " + request.fIndex);
} else {
rm.setData(Integer.toString(request.fIndex));
}
rm.done();
}
/**
* This method simulates changes in the supplier's data set.
*/
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void randomChanges() {
// Once every number of changes, reset the count, the rest of the
// times just change certain values.
if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){
randomCountReset();
} else {
randomDataChange();
}
}
/**
* Calculates new size for provider's data set.
*/
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void randomCountReset() {
// Calculate the new count.
Random random = new java.util.Random();
fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
fChangedIndexes.clear();
// Notify listeners
for (Listener listener : fListeners) {
listener.countChanged();
}
}
/**
* Invalidates a random range of indexes.
*/
//#ifdef excercises
// TODO Excercise 3 - Add an annotationindicating allowed concurrency access
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
Set<Integer> set = new HashSet<Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
}
// Add the indexes to an overall set of changed indexes.
fChangedIndexes.addAll(set);
// Notify listeners
for (Listener listener : fListeners) {
listener.valuesChanged(set);
}
}
}

View file

@ -0,0 +1,238 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.dataviewer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.ListenerList;
import org.eclipse.core.runtime.Status;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.examples.dsf.DsfExamplesPlugin;
/**
* Thread-based implementation of the data generator.
* <p>
* This generator is based around a queue of client requests and a thread which
* reads the requests from the queue and processes them. The distinguishing
* feature of this generator is that it uses a a blocking queue as the main
* synchronization object. However, fListeners, fShutdown, and fChangedIndexes
* fields also need to be thread-safe and so they implement their own
* synchronization.
* </p>
*/
public class DataGeneratorWithThread extends Thread implements IDataGenerator {
// Request objects are used to serialize the interface calls into objects
// which can then be pushed into a queue.
abstract class Request {
final RequestMonitor fRequestMonitor;
Request(RequestMonitor rm) {
fRequestMonitor = rm;
}
}
class CountRequest extends Request {
CountRequest(DataRequestMonitor<Integer> rm) {
super(rm);
}
}
class ItemRequest extends Request {
final int fIndex;
ItemRequest(int index, DataRequestMonitor<String> rm) {
super(rm);
fIndex = index;
}
}
class ShutdownRequest extends Request {
ShutdownRequest(RequestMonitor rm) {
super(rm);
}
}
// Main request queue of the data generator. The getValue(), getCount(),
// and shutdown() methods write into the queue, while the run() method
// reads from it.
private final BlockingQueue<Request> fQueue = new LinkedBlockingQueue<Request>();
// ListenerList class provides thread safety.
private ListenerList fListeners = new ListenerList();
// Current number of elements in this generator.
private int fCount = MIN_COUNT;
// Counter used to determine when to reset the element count.
private int fCountResetTrigger = 0;
// Elements which were modified since the last reset.
private Set<Integer> fChangedIndexes = Collections.synchronizedSet(new HashSet<Integer>());
// Used to determine when to make changes in data.
private long fLastChangeTime = System.currentTimeMillis();
// Flag indicating when the generator has been shut down.
private AtomicBoolean fShutdown = new AtomicBoolean(false);
public DataGeneratorWithThread() {
// Immediately kick off the request processing thread.
start();
}
public void shutdown(RequestMonitor rm) {
// Mark the generator as shut down. After the fShutdown flag is set,
// all new requests should be shut down.
if (!fShutdown.getAndSet(true)) {
fQueue.add(new ShutdownRequest(rm));
} else {
//
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
public void getCount(DataRequestMonitor<Integer> rm) {
if (!fShutdown.get()) {
fQueue.add(new CountRequest(rm));
} else {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
public void getValue(int index, DataRequestMonitor<String> rm) {
if (!fShutdown.get()) {
fQueue.add(new ItemRequest(index, rm));
} else {
rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
rm.done();
}
}
public void addListener(Listener listener) {
fListeners.add(listener);
}
public void removeListener(Listener listener) {
fListeners.remove(listener);
}
@Override
public void run() {
try {
while(true) {
// Get the next request from the queue. The time-out
// ensures that that the random changes get processed.
final Request request = fQueue.poll(100, TimeUnit.MILLISECONDS);
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
Thread.sleep(PROCESSING_DELAY);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
} else if (request instanceof ItemRequest) {
processItemRequest((ItemRequest)request);
} else if (request instanceof ShutdownRequest) {
// If shutting down, just break out of the while(true)
// loop and thread will exit.
request.fRequestMonitor.done();
break;
}
}
// Simulate data changes.
randomChanges();
}
}
catch (InterruptedException x) {}
}
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
}
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
if (fChangedIndexes.contains(request.fIndex)) {
rm.setData("Changed: " + request.fIndex);
} else {
rm.setData(Integer.toString(request.fIndex));
}
rm.done();
}
private void randomChanges() {
// Check if enough time is elapsed.
if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) {
fLastChangeTime = System.currentTimeMillis();
// Once every number of changes, reset the count, the rest of the
// times just change certain values.
if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){
randomCountReset();
} else {
randomDataChange();
}
}
}
private void randomCountReset() {
// Calculate the new count.
Random random = new java.util.Random();
fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
fChangedIndexes.clear();
// Notify listeners
for (Object listener : fListeners.getListeners()) {
((Listener)listener).countChanged();
}
}
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
Set<Integer> set = new HashSet<Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
}
// Add the indexes to an overall set of changed indexes.
fChangedIndexes.addAll(set);
// Notify listeners
for (Object listener : fListeners.getListeners()) {
((Listener)listener).valuesChanged(set);
}
}
}

View file

@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2006 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.dataviewer;
import java.util.Set;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.RequestMonitor;
import org.eclipse.dd.dsf.concurrent.ThreadSafe;
/**
* Data generator is simple source of data used to populate the example table
* view. It contains two asynchronous methods for retrieving the data
* parameters: the count and the value for a given index. It also allows the
* view to receive events indicating when the data supplied by the generator
* is changed.
*/
@ThreadSafe
public interface IDataGenerator {
// Constants which control the data generator behavior.
// Changing the count range can stress the scalability of the system, while
// changing of the process delay and random change interval can stress
// its performance.
final static int MIN_COUNT = 50;
final static int MAX_COUNT = 100;
final static int PROCESSING_DELAY = 10;
final static int RANDOM_CHANGE_INTERVAL = 10000;
final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
// Listener interface that the view needs to implement to react
// to the changes in data.
public interface Listener {
void countChanged();
void valuesChanged(Set<Integer> indexes);
}
// Data access methods.
void getCount(DataRequestMonitor<Integer> rm);
void getValue(int index, DataRequestMonitor<String> rm);
// Method used to shutdown the data generator including any threads that
// it may use.
void shutdown(RequestMonitor rm);
// Methods for registering change listeners.
void addListener(Listener listener);
void removeListener(Listener listener);
}

View file

@ -0,0 +1,180 @@
/*******************************************************************************
* Copyright (c) 2008 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.dd.examples.dsf.dataviewer;
import java.util.Set;
import org.eclipse.dd.dsf.concurrent.DataRequestMonitor;
import org.eclipse.dd.dsf.concurrent.ImmediateExecutor;
import org.eclipse.dd.dsf.concurrent.Query;
import org.eclipse.jface.viewers.IStructuredContentProvider;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.swt.SWT;
import org.eclipse.swt.graphics.Font;
import org.eclipse.swt.layout.GridData;
import org.eclipse.swt.layout.GridLayout;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Shell;
/**
* Data viewer based on a table, which reads data using synchronous methods.
* <p>
* This viewer implements the {@link IStructuredContentProvider} interface
* which is used by the JFace TableViewer class to populate a Table. This
* interface contains one principal methods for reading data {@link #getElements(Object)},
* which synchronously returns an array of elements. In order to implement this
* method using the asynchronous data generator, this provider uses the
* {@link Query} object.
* </p>
*/
public class SyncDataViewer
implements IStructuredContentProvider, IDataGenerator.Listener
{
// The viewer and generator that this content provider using.
final private TableViewer fViewer;
final private IDataGenerator fDataGenerator;
public SyncDataViewer(TableViewer viewer, IDataGenerator generator) {
fViewer = viewer;
fDataGenerator = generator;
fDataGenerator.addListener(this);
}
public void inputChanged(Viewer viewer, Object oldInput, Object newInput) {
// Not used
}
public Object[] getElements(Object inputElement) {
// Create the query object for reading data count.
Query<Integer> countQuery = new Query<Integer>() {
@Override
protected void execute(DataRequestMonitor<Integer> rm) {
fDataGenerator.getCount(rm);
}
};
// Submit the query to be executed. A query implements a runnable
// interface and it has to be executed in order to do its work.
ImmediateExecutor.getInstance().execute(countQuery);
int count = 0;
// Block until the query completes, which will happen when the request
// monitor of the execute() method is marked done.
try {
count = countQuery.get();
} catch (Exception e) {
// InterruptedException and ExecutionException can be thrown here.
// ExecutionException containing a CoreException will be thrown
// if an error status is set to the Query's request monitor.
return new Object[0];
}
// Create the array that will be filled with elements.
// For each index in the array execute a query to get the element at
// that index.
final Object[] elements = new Object[count];
for (int i = 0; i < count; i++) {
final int index = i;
Query<String> valueQuery = new Query<String>() {
@Override
protected void execute(DataRequestMonitor<String> rm) {
fDataGenerator.getValue(index, rm);
}
};
ImmediateExecutor.getInstance().execute(valueQuery);
try {
elements[i] = valueQuery.get();
} catch (Exception e) {
elements[i] = "error";
}
}
return elements;
}
public void dispose() {
fDataGenerator.removeListener(this);
}
public void countChanged() {
// For any event from the generator, refresh the whole viewer.
refreshViewer();
}
public void valuesChanged(Set<Integer> indexes) {
// For any event from the generator, refresh the whole viewer.
refreshViewer();
}
private void refreshViewer() {
// This method may be called on any thread, switch to the display
// thread before calling the viewer.
Display display = fViewer.getControl().getDisplay();
display.asyncExec( new Runnable() {
public void run() {
if (!fViewer.getControl().isDisposed()) {
fViewer.refresh();
}
}
});
}
public static void main(String[] args) {
// Create the shell to hold the viewer.
Display display = new Display();
Shell shell = new Shell(display, SWT.SHELL_TRIM);
shell.setLayout(new GridLayout());
GridData data = new GridData(GridData.FILL_BOTH);
shell.setLayoutData(data);
Font font = new Font(display, "Courier", 10, SWT.NORMAL);
// Create the table viewer.
TableViewer tableViewer = new TableViewer(shell, SWT.BORDER);
tableViewer.getControl().setLayoutData(data);
// Create the data generator.
final IDataGenerator generator = new DataGeneratorWithThread();
// Create the content provider which will populate the viewer.
SyncDataViewer contentProvider = new SyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());
// Open the shell and service the display dispatch loop until user
// closes the shell.
shell.open();
while (!shell.isDisposed()) {
if (!display.readAndDispatch())
display.sleep();
}
// The IDataGenerator.shutdown() method is asynchronous, this requires
// using a query again in order to wait for its completion.
Query<Object> shutdownQuery = new Query<Object>() {
@Override
protected void execute(DataRequestMonitor<Object> rm) {
generator.shutdown(rm);
}
};
ImmediateExecutor.getInstance().execute(shutdownQuery);
try {
shutdownQuery.get();
} catch (Exception e) {}
// Shut down the display.
font.dispose();
display.dispose();
}
}