From 58513ccf2d81c50492e2087a87d31cd15e9e60df Mon Sep 17 00:00:00 2001 From: Pawel Piech Date: Wed, 30 Nov 2011 13:40:41 -0800 Subject: [PATCH] Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) utilities for DSF - Added an ACPM example to DSF examples plugin. --- .../cdt/dsf/concurrent/AbstractCache.java | 66 ++++++--- .../cdt/dsf/concurrent/RangeCache.java | 18 +-- .../cdt/dsf/concurrent/RequestCache.java | 9 ++ .../cdt/dsf/concurrent/Transaction.java | 32 ++-- .../dsf/dataviewer/AsyncDataViewer.java | 65 ++++++--- .../dataviewer/DataGeneratorWithExecutor.java | 138 +++++++++--------- .../dataviewer/DataGeneratorWithThread.java | 62 +++++--- .../dsf/dataviewer/IDataGenerator.java | 14 +- .../dsf/dataviewer/SyncDataViewer.java | 70 ++++++--- 9 files changed, 293 insertions(+), 181 deletions(-) diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java index 1f22902065a..84ad02dd935 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java @@ -154,6 +154,27 @@ public abstract class AbstractCache implements ICache { rm.done(); } } + + private void completeWaitingRms() { + Object waiting = null; + synchronized(this) { + waiting = fWaitingList; + fWaitingList = null; + } + if (waiting != null) { + if (waiting instanceof RequestMonitor) { + completeWaitingRm((RequestMonitor)waiting); + } else if (waiting instanceof RequestMonitor[]) { + RequestMonitor[] waitingList = (RequestMonitor[])waiting; + for (int i = 0; i < waitingList.length; i++) { + if (waitingList[i] != null) { + completeWaitingRm(waitingList[i]); + } + } + } + waiting = null; + } + } private void completeWaitingRm(RequestMonitor rm) { rm.setStatus(fStatus); @@ -300,23 +321,32 @@ public abstract class AbstractCache implements ICache { fStatus = status; fValid = true; - Object waiting = null; - synchronized(this) { - waiting = fWaitingList; - fWaitingList = null; - } - if (waiting != null) { - if (waiting instanceof RequestMonitor) { - completeWaitingRm((RequestMonitor)waiting); - } else if (waiting instanceof RequestMonitor[]) { - RequestMonitor[] waitingList = (RequestMonitor[])waiting; - for (int i = 0; i < waitingList.length; i++) { - if (waitingList[i] != null) { - completeWaitingRm(waitingList[i]); - } - } - } - waiting = null; - } + completeWaitingRms(); } + + /** + * Performs the set and reset operations in one step This allows the cache to + * remain in invalid state, but to notify any waiting listeners that the state of + * the cache has changed. + * + * @param data + * The data that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is + * invalidated. + * @status The status that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is + * invalidated + * + * @see #reset(Object, IStatus) + */ + protected void setAndReset(V data, IStatus status) { + assert fExecutor.getDsfExecutor().isInExecutorThread(); + + fData = data; + fStatus = status; + fValid = false; + + completeWaitingRms(); + } + } diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java index 9f8863c3da8..8c5c09eb979 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java @@ -107,8 +107,7 @@ abstract public class RangeCache { protected List process() throws InvalidCacheException, CoreException { clearCanceledRequests(); - List> transactionRequests = getRequests(fOffset, fCount); - + List transactionRequests = getRequests(fOffset, fCount); validate(transactionRequests); return makeElementsListFromRequests(transactionRequests, fOffset, fCount); @@ -156,7 +155,7 @@ abstract public class RangeCache { public ICache> getRange(final long offset, final int count) { assert fExecutor.getDsfExecutor().isInExecutorThread(); - List> requests = getRequests(offset, count); + List requests = getRequests(offset, count); RequestCache> range = new RequestCache>(fExecutor) { @Override @@ -232,8 +231,8 @@ abstract public class RangeCache { } } - private List> getRequests(long fOffset, int fCount) { - List> requests = new ArrayList>(1); + private List getRequests(long fOffset, int fCount) { + List requests = new ArrayList(1); // Create a new request for the data to retrieve. Request current = new Request(fOffset, fCount); @@ -252,7 +251,7 @@ abstract public class RangeCache { // Adjust the beginning of the requested range of data. If there // is already an overlapping range in front of the requested range, // then use it. - private Request adjustRequestHead(Request request, List> transactionRequests, long offset, int count) { + private Request adjustRequestHead(Request request, List transactionRequests, long offset, int count) { SortedSet headRequests = fRequests.headSet(request); if (!headRequests.isEmpty()) { Request headRequest = headRequests.last(); @@ -276,7 +275,7 @@ abstract public class RangeCache { * @param transactionRequests * @return */ - private Request adjustRequestTail(Request current, List> transactionRequests, long offset, int count) { + private Request adjustRequestTail(Request current, List transactionRequests, long offset, int count) { // Create a duplicate of the tailSet, in order to avoid a concurrent modification exception. List tailSet = new ArrayList(fRequests.tailSet(current)); @@ -313,14 +312,13 @@ abstract public class RangeCache { return current; } - private List makeElementsListFromRequests(List> requests, long offset, int count) { + private List makeElementsListFromRequests(List requests, long offset, int count) { List retVal = new ArrayList(count); long index = offset; long end = offset + count; int requestIdx = 0; while (index < end ) { - @SuppressWarnings("unchecked") - Request request = (Request)requests.get(requestIdx); + Request request = requests.get(requestIdx); if (index < request.fOffset + request.fCount) { retVal.add( request.getData().get((int)(index - request.fOffset)) ); index ++; diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java index 76b94d6e145..636db58b525 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java @@ -94,4 +94,13 @@ public abstract class RequestCache extends AbstractCache { } super.set(data, status); } + + @Override + protected void reset() { + if (fRm != null) { + fRm.cancel(); + fRm = null; + } + super.reset(); + } } diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java index ef72971b5ab..2b047bdbb96 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java @@ -70,8 +70,10 @@ public abstract class Transaction { * logic once the cache object has been updated from the source. * * @return the cached data if it's valid, otherwise an exception is thrown - * @throws InvalidCacheException - * @throws CoreException + * @throws Transaction.InvalidCacheException Exception indicating that a + * cache is not valid and transaction will need to be rescheduled. + * @throws CoreException Exception indicating that one of the caches is + * in error state and transaction cannot be processed. */ abstract protected V process() throws InvalidCacheException, CoreException; @@ -149,19 +151,20 @@ public abstract class Transaction { * See {@link #validate(RequestCache)}. This variant simply validates * multiple cache objects. */ - public void validate(ICache ... caches) throws InvalidCacheException, CoreException { + public void validate(ICache ... caches) throws InvalidCacheException, CoreException { validate(Arrays.asList(caches)); } - - /** - * See {@link #validate(RequestCache)}. This variant simply validates - * multiple cache objects. - */ - public void validate(Iterable> caches) throws InvalidCacheException, CoreException { + + /** + * See {@link #validate(RequestCache)}. This variant simply validates + * multiple cache objects. + */ + public void validate(@SuppressWarnings("rawtypes") Iterable caches) throws InvalidCacheException, CoreException { // Check if any of the caches have errors: boolean allValid = true; - for (ICache cache : caches) { + for (Object cacheObj : caches) { + ICache cache = (ICache)cacheObj; if (cache.isValid()) { if (!cache.getStatus().isOK()) { throw new CoreException(cache.getStatus()); @@ -171,9 +174,9 @@ public abstract class Transaction { } } if (!allValid) { - // Throw the invalid cache exception, but first schedule a - // re-attempt of the transaction logic, to occur when the - // stale/unset cache objects have been updated + // Throw the invalid cache exception, but first schedule a + // re-attempt of the transaction logic, to occur when the + // stale/unset cache objects have been updated CountingRequestMonitor countringRm = new CountingRequestMonitor(ImmediateExecutor.getInstance(), fRm) { @Override protected void handleCompleted() { @@ -181,7 +184,8 @@ public abstract class Transaction { } }; int count = 0; - for (ICache cache : caches) { + for (Object cacheObj : caches) { + ICache cache = (ICache)cacheObj; if (!cache.isValid()) { cache.update(countringRm); count++; diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java index 01e29901fb2..d19ef9920ae 100644 --- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2006, 2009 Wind River Systems and others. + * Copyright (c) 2006, 2011 Wind River Systems and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -66,13 +66,15 @@ public class AsyncDataViewer final private IDataGenerator fDataGenerator; // Fields used in request cancellation logic. - private List fItemDataRequestMonitors = new LinkedList(); + private List fItemDataRequestMonitors = + new LinkedList(); private Set fIndexesToCancel = new HashSet(); private int fCancelCallsPending = 0; public AsyncDataViewer(TableViewer viewer, IDataGenerator generator) { fViewer = viewer; - fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(fViewer.getTable().getDisplay()); + fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor( + fViewer.getTable().getDisplay()); fDataGenerator = generator; fDataGenerator.addListener(this); } @@ -104,10 +106,18 @@ public class AsyncDataViewer 1, TimeUnit.MILLISECONDS); } + /** + * Calculates the number of visible items based on the top item index and + * table bounds. + * @param top Index of top item. + * @return calculated number of items in viewer + */ private int getVisibleItemCount(int top) { Table table = fViewer.getTable(); int itemCount = table.getItemCount(); - return Math.min((table.getBounds().height / table.getItemHeight()) + 2, itemCount - top); + return Math.min( + (table.getBounds().height / table.getItemHeight()) + 2, + itemCount - top); } @ThreadSafe @@ -131,7 +141,10 @@ public class AsyncDataViewer }}); } - + /** + * Retrieve the up to date count. When a new count is set to viewer, the + * viewer will refresh all items as well. + */ private void queryItemCount() { // Request count from data provider. When the count is returned, we // have to re-dispatch into the display thread to avoid calling @@ -150,13 +163,25 @@ public class AsyncDataViewer } } }); - } - - // Dedicated class for data item requests. This class holds the index - // argument so it can be examined when canceling stale requests. - private class ValueDataRequestMonitor extends DataRequestMonitor { + + /** + * Retrieves value of an element at given index. When complete the value + * is written to the viewer. + * @param index Index of value to retrieve. + */ + private void queryValue(final int index) { + ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index); + fItemDataRequestMonitors.add(rm); + fDataGenerator.getValue(index, rm); + } + + /** + * Dedicated class for data item requests. This class holds the index + * argument so it can be examined when canceling stale requests. + */ + private class ValueDataRequestMonitor extends DataRequestMonitor { /** Index is used when canceling stale requests. */ int fIndex; @@ -170,7 +195,8 @@ public class AsyncDataViewer protected void handleCompleted() { fItemDataRequestMonitors.remove(this); - // Check if the request completed successfully, otherwise ignore it. + // Check if the request completed successfully, otherwise ignore + // it. if (isSuccess()) { if (!fViewer.getTable().isDisposed()) { fViewer.replace(getData(), fIndex); @@ -178,12 +204,6 @@ public class AsyncDataViewer } } } - - private void queryValue(final int index) { - ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index); - fItemDataRequestMonitors.add(rm); - fDataGenerator.getValue(index, rm); - } private void cancelStaleRequests(int topIdx, int botIdx) { // Decrement the count of outstanding cancel calls. @@ -194,7 +214,10 @@ public class AsyncDataViewer // Go through the outstanding requests and cancel any that // are not visible anymore. - for (Iterator itr = fItemDataRequestMonitors.iterator(); itr.hasNext();) { + for (Iterator itr = + fItemDataRequestMonitors.iterator(); + itr.hasNext();) + { ValueDataRequestMonitor item = itr.next(); if (item.fIndex < topIdx || item.fIndex > botIdx) { // Set the item to canceled status, so that the data provider @@ -237,14 +260,16 @@ public class AsyncDataViewer Font font = new Font(display, "Courier", 10, SWT.NORMAL); // Create the table viewer. - TableViewer tableViewer = new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL); + TableViewer tableViewer = + new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL); tableViewer.getControl().setLayoutData(data); // Create the data generator. final IDataGenerator generator = new DataGeneratorWithExecutor(); // Create the content provider which will populate the viewer. - AsyncDataViewer contentProvider = new AsyncDataViewer(tableViewer, generator); + AsyncDataViewer contentProvider = + new AsyncDataViewer(tableViewer, generator); tableViewer.setContentProvider(contentProvider); tableViewer.setInput(new Object()); diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java index 3a7393c7fd9..d1ee0b542ed 100644 --- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2006, 2009 Wind River Systems and others. + * Copyright (c) 2006, 2011 Wind River Systems and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -14,14 +14,14 @@ package org.eclipse.cdt.examples.dsf.dataviewer; //#package org.eclipse.cdt.examples.dsf.dataviewer.answers; //#endif -import java.util.HashSet; +import java.util.HashMap; //#ifdef answers //#import java.util.Iterator; //#endif import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.Set; +import java.util.Map; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -70,6 +70,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator { Request(RequestMonitor rm) { fRequestMonitor = rm; + + rm.addCancelListener(new RequestMonitor.ICanceledListener() { + public void requestCanceled(RequestMonitor rm) { + fExecutor.execute(new DsfRunnable() { + public void run() { + fQueue.remove(Request.this); + } + }); + } + }); } } @@ -93,7 +103,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //#endif class ItemRequest extends Request { final int fIndex; - ItemRequest(int index, DataRequestMonitor rm) { + ItemRequest(int index, DataRequestMonitor rm) { super(rm); fIndex = index; } @@ -156,24 +166,20 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //#else //# @ConfinedToDsfExecutor("fExecutor") //#endif - private Set fChangedIndexes = new HashSet(); + private Map fChangedValues = + new HashMap(); - // Flag used to ensure that requests are processed sequentially. - //#ifdef exercises - // TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor) - // indicating allowed thread access to this class/method/member - //#else -//# @ConfinedToDsfExecutor("fExecutor") - //#endif - private boolean fServiceQueueInProgress = false; - - //#ifdef exercises - // TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor) - // indicating allowed thread access to this class/method/member - //#endif public DataGeneratorWithExecutor() { // Create the executor - fExecutor = new DefaultDsfExecutor("Supplier Executor"); + this(new DefaultDsfExecutor("Supplier Executor")); + } + //#ifdef exercises + // TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor) + // indicating allowed thread access to this class/method/member + //#endif + public DataGeneratorWithExecutor(DsfExecutor executor) { + // Create the executor + fExecutor = executor; // Schedule a runnable to make the random changes. fExecutor.scheduleAtFixedRate( @@ -182,8 +188,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator { randomChanges(); } }, - RANDOM_CHANGE_INTERVAL, - RANDOM_CHANGE_INTERVAL, + new Random().nextInt() % RANDOM_CHANGE_INTERVAL, + RANDOM_CHANGE_INTERVAL, //Add a 10% variance to the interval. TimeUnit.MILLISECONDS); } @@ -197,8 +203,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator { public void run() { // Empty the queue of requests and fail them. for (Request request : fQueue) { - request.fRequestMonitor.setStatus( - new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + request.fRequestMonitor.setStatus(new Status( + IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); request.fRequestMonitor.done(); } fQueue.clear(); @@ -209,7 +216,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator { } }); } catch (RejectedExecutionException e) { - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } @@ -227,7 +235,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator { } }); } catch (RejectedExecutionException e) { - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status( + IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } @@ -236,7 +246,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator { // TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor) // indicating allowed thread access to this class/method/member //#endif - public void getValue(final int index, final DataRequestMonitor rm) { + public void getValue(final int index, final DataRequestMonitor rm) { try { fExecutor.execute( new DsfRunnable() { public void run() { @@ -245,7 +255,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator { } }); } catch (RejectedExecutionException e) { - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } @@ -286,7 +297,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //# @ConfinedToDsfExecutor("fExecutor") //#endif private void serviceQueue() { - + fExecutor.schedule( + new DsfRunnable() { + public void run() { + doServiceQueue(); + } + }, + PROCESSING_DELAY, TimeUnit.MILLISECONDS); + } + + private void doServiceQueue() { //#ifdef exercises // TODO Exercise 3 - Add logic to discard cancelled requests from queue. // Hint: Since serviceQueue() is called using the executor, and the @@ -305,33 +325,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //# } //#endif - // If a queue servicing is already scheduled, do nothing. - if (fServiceQueueInProgress) { - return; - } - - if (fQueue.size() != 0) { + while (fQueue.size() != 0) { // If there are requests to service, remove one from the queue and // schedule a runnable to process the request after a processing // delay. - fServiceQueueInProgress = true; - final Request request = fQueue.remove(0); - fExecutor.schedule( - new DsfRunnable() { - public void run() { - if (request instanceof CountRequest) { - processCountRequest((CountRequest)request); - } else if (request instanceof ItemRequest) { - processItemRequest((ItemRequest)request); - } - - // Reset the processing flag and process next - // request. - fServiceQueueInProgress = false; - serviceQueue(); - } - }, - PROCESSING_DELAY, TimeUnit.MILLISECONDS); + Request request = fQueue.remove(0); + if (request instanceof CountRequest) { + processCountRequest((CountRequest)request); + } else if (request instanceof ItemRequest) { + processItemRequest((ItemRequest)request); + } } } @@ -343,7 +346,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //#endif private void processCountRequest(CountRequest request) { @SuppressWarnings("unchecked") // Suppress warning about lost type info. - DataRequestMonitor rm = (DataRequestMonitor)request.fRequestMonitor; + DataRequestMonitor rm = + (DataRequestMonitor)request.fRequestMonitor; rm.setData(fCount); rm.done(); @@ -357,12 +361,13 @@ public class DataGeneratorWithExecutor implements IDataGenerator { //#endif private void processItemRequest(ItemRequest request) { @SuppressWarnings("unchecked") // Suppress warning about lost type info. - DataRequestMonitor rm = (DataRequestMonitor)request.fRequestMonitor; + DataRequestMonitor rm = + (DataRequestMonitor)request.fRequestMonitor; - if (fChangedIndexes.contains(request.fIndex)) { - rm.setData("Changed: " + request.fIndex); + if (fChangedValues.containsKey(request.fIndex)) { + rm.setData(fChangedValues.get(request.fIndex)); } else { - rm.setData(Integer.toString(request.fIndex)); + rm.setData(request.fIndex); } rm.done(); } @@ -398,10 +403,11 @@ public class DataGeneratorWithExecutor implements IDataGenerator { private void randomCountReset() { // Calculate the new count. Random random = new java.util.Random(); - fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT); + fCount = MIN_COUNT + + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT); // Reset the changed values. - fChangedIndexes.clear(); + fChangedValues.clear(); // Notify listeners for (Listener listener : fListeners) { @@ -421,17 +427,19 @@ public class DataGeneratorWithExecutor implements IDataGenerator { private void randomDataChange() { // Calculate the indexes to change. Random random = new java.util.Random(); - Set set = new HashSet(); + Map changed = new HashMap(); for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) { - set.add( new Integer(Math.abs(random.nextInt()) % fCount) ); - } + int randomIndex = Math.abs(random.nextInt()) % fCount; + int randomValue = Math.abs(random.nextInt()) % fCount; + changed.put(randomIndex, randomValue); + } // Add the indexes to an overall set of changed indexes. - fChangedIndexes.addAll(set); + fChangedValues.putAll(changed); // Notify listeners - for (Listener listener : fListeners) { - listener.valuesChanged(set); + for (Object listener : fListeners) { + ((Listener)listener).valuesChanged(changed.keySet()); } } } diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java index e4e736a14ea..3b4a909e884 100644 --- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2006, 2009 Wind River Systems and others. + * Copyright (c) 2006, 2011 Wind River Systems and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -15,9 +15,9 @@ package org.eclipse.cdt.examples.dsf.dataviewer; //#endif import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.Random; -import java.util.Set; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -41,7 +41,9 @@ import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin; * synchronization. *

*/ -public class DataGeneratorWithThread extends Thread implements IDataGenerator { +public class DataGeneratorWithThread extends Thread + implements IDataGenerator +{ // Request objects are used to serialize the interface calls into objects // which can then be pushed into a queue. @@ -61,7 +63,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { class ItemRequest extends Request { final int fIndex; - ItemRequest(int index, DataRequestMonitor rm) { + ItemRequest(int index, DataRequestMonitor rm) { super(rm); fIndex = index; } @@ -76,7 +78,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { // Main request queue of the data generator. The getValue(), getCount(), // and shutdown() methods write into the queue, while the run() method // reads from it. - private final BlockingQueue fQueue = new LinkedBlockingQueue(); + private final BlockingQueue fQueue = + new LinkedBlockingQueue(); // ListenerList class provides thread safety. private ListenerList fListeners = new ListenerList(); @@ -88,7 +91,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { private int fCountResetTrigger = 0; // Elements which were modified since the last reset. - private Set fChangedIndexes = Collections.synchronizedSet(new HashSet()); + private Map fChangedValues = + Collections.synchronizedMap(new HashMap()); // Used to determine when to make changes in data. private long fLastChangeTime = System.currentTimeMillis(); @@ -108,7 +112,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { fQueue.add(new ShutdownRequest(rm)); } else { // - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } @@ -117,16 +122,18 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { if (!fShutdown.get()) { fQueue.add(new CountRequest(rm)); } else { - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } - public void getValue(int index, DataRequestMonitor rm) { + public void getValue(int index, DataRequestMonitor rm) { if (!fShutdown.get()) { fQueue.add(new ItemRequest(index, rm)); } else { - rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); + rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, + "Supplier shut down")); rm.done(); } } @@ -150,7 +157,6 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { // If a request was dequeued, process it. if (request != null) { // Simulate a processing delay. - Thread.sleep(PROCESSING_DELAY); if (request instanceof CountRequest) { processCountRequest((CountRequest)request); @@ -162,6 +168,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { request.fRequestMonitor.done(); break; } + } else { + Thread.sleep(PROCESSING_DELAY); } // Simulate data changes. @@ -173,7 +181,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { private void processCountRequest(CountRequest request) { @SuppressWarnings("unchecked") // Suppress warning about lost type info. - DataRequestMonitor rm = (DataRequestMonitor)request.fRequestMonitor; + DataRequestMonitor rm = + (DataRequestMonitor)request.fRequestMonitor; rm.setData(fCount); rm.done(); @@ -181,12 +190,13 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { private void processItemRequest(ItemRequest request) { @SuppressWarnings("unchecked") // Suppress warning about lost type info. - DataRequestMonitor rm = (DataRequestMonitor)request.fRequestMonitor; + DataRequestMonitor rm = + (DataRequestMonitor)request.fRequestMonitor; - if (fChangedIndexes.contains(request.fIndex)) { - rm.setData("Changed: " + request.fIndex); + if (fChangedValues.containsKey(request.fIndex)) { + rm.setData(fChangedValues.get(request.fIndex)); } else { - rm.setData(Integer.toString(request.fIndex)); + rm.setData(request.fIndex); } rm.done(); } @@ -194,12 +204,14 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { private void randomChanges() { // Check if enough time is elapsed. - if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) { + if (System.currentTimeMillis() > + fLastChangeTime + RANDOM_CHANGE_INTERVAL) + { fLastChangeTime = System.currentTimeMillis(); // Once every number of changes, reset the count, the rest of the // times just change certain values. - if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){ + if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) { randomCountReset(); } else { randomDataChange(); @@ -213,7 +225,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT); // Reset the changed values. - fChangedIndexes.clear(); + fChangedValues.clear(); // Notify listeners for (Object listener : fListeners.getListeners()) { @@ -224,17 +236,19 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator { private void randomDataChange() { // Calculate the indexes to change. Random random = new java.util.Random(); - Set set = new HashSet(); + Map changed = new HashMap(); for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) { - set.add( new Integer(Math.abs(random.nextInt()) % fCount) ); + int randomIndex = Math.abs(random.nextInt()) % fCount; + int randomValue = Math.abs(random.nextInt()) % fCount; + changed.put(randomIndex, randomValue); } // Add the indexes to an overall set of changed indexes. - fChangedIndexes.addAll(set); + fChangedValues.putAll(changed); // Notify listeners for (Object listener : fListeners.getListeners()) { - ((Listener)listener).valuesChanged(set); + ((Listener)listener).valuesChanged(changed.keySet()); } } } diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java index 398b09a4e88..ef7eb53f27a 100644 --- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2006, 2009 Wind River Systems and others. + * Copyright (c) 2006, 2011 Wind River Systems and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -41,11 +41,11 @@ public interface IDataGenerator { // Changing the count range can stress the scalability of the system, while // changing of the process delay and random change interval can stress // its performance. - final static int MIN_COUNT = 100; - final static int MAX_COUNT = 200; - final static int PROCESSING_DELAY = 10; - final static int RANDOM_CHANGE_INTERVAL = 10000; - final static int RANDOM_COUNT_CHANGE_INTERVALS = 3; + final static int MIN_COUNT = 50; + final static int MAX_COUNT = 100; + final static int PROCESSING_DELAY = 500; + final static int RANDOM_CHANGE_INTERVAL = 4000; + final static int RANDOM_COUNT_CHANGE_INTERVALS = 5; final static int RANDOM_CHANGE_SET_PERCENTAGE = 10; @@ -58,7 +58,7 @@ public interface IDataGenerator { // Data access methods. void getCount(DataRequestMonitor rm); - void getValue(int index, DataRequestMonitor rm); + void getValue(int index, DataRequestMonitor rm); // Method used to shutdown the data generator including any threads that // it may use. diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java index 6ce9c06ef4e..335d198415f 100644 --- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java +++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2008, 2009 Wind River Systems and others. + * Copyright (c) 2008, 2011 Wind River Systems and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -14,8 +14,11 @@ package org.eclipse.cdt.examples.dsf.dataviewer; //#package org.eclipse.cdt.examples.dsf.dataviewer.answers; //#endif +import java.util.Arrays; +import java.util.List; import java.util.Set; +import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor; import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor; import org.eclipse.cdt.dsf.concurrent.Query; @@ -35,8 +38,8 @@ import org.eclipse.swt.widgets.Shell; * This viewer implements the {@link IStructuredContentProvider} interface * which is used by the JFace TableViewer class to populate a Table. This * interface contains one principal methods for reading data {@link #getElements(Object)}, - * which synchronously returns an array of elements. In order to implement this - * method using the asynchronous data generator, this provider uses the + * which synchronously returns an array of elements. In order to implement + * this method using the asynchronous data generator, this provider uses the * {@link Query} object. *

*/ @@ -84,27 +87,43 @@ public class SyncDataViewer return new Object[0]; } - // Create the array that will be filled with elements. - // For each index in the array execute a query to get the element at - // that index. - final Object[] elements = new Object[count]; - - for (int i = 0; i < count; i++) { - final int index = i; - Query valueQuery = new Query() { - @Override - protected void execute(DataRequestMonitor rm) { - fDataGenerator.getValue(index, rm); + final int finalCount = count; + Query> valueQuery = new Query>() { + @Override + protected void execute(final DataRequestMonitor> rm) { + final Integer[] retVal = new Integer[finalCount]; + final CountingRequestMonitor crm = new CountingRequestMonitor( + ImmediateExecutor.getInstance(), rm) + { + @Override + protected void handleSuccess() { + rm.setData(Arrays.asList(retVal)); + rm.done(); + }; + }; + for (int i = 0; i < finalCount; i++) { + final int finalI = i; + fDataGenerator.getValue( + i, + new DataRequestMonitor( + ImmediateExecutor.getInstance(), crm) + { + @Override + protected void handleSuccess() { + retVal[finalI] = getData(); + crm.done(); + } + }); } - }; - ImmediateExecutor.getInstance().execute(valueQuery); - try { - elements[i] = valueQuery.get(); - } catch (Exception e) { - elements[i] = "error"; - } + crm.setDoneCount(finalCount); + } + }; + ImmediateExecutor.getInstance().execute(valueQuery); + try { + return valueQuery.get().toArray(new Integer[0]); + } catch (Exception e) { } - return elements; + return new Object[0]; } public void dispose() { @@ -140,6 +159,10 @@ public class SyncDataViewer }); } + /** + * The entry point for the example. + * @param args Program arguments. + */ public static void main(String[] args) { // Create the shell to hold the viewer. Display display = new Display(); @@ -162,7 +185,8 @@ public class SyncDataViewer //#endif // Create the content provider which will populate the viewer. - SyncDataViewer contentProvider = new SyncDataViewer(tableViewer, generator); + SyncDataViewer contentProvider = + new SyncDataViewer(tableViewer, generator); tableViewer.setContentProvider(contentProvider); tableViewer.setInput(new Object());