From ac3e1dfd9075f049b6af7e0a6efe658f8853960d Mon Sep 17 00:00:00 2001 From: Martin Oberhuber < martin.oberhuber@windriver.com> Date: Thu, 10 Aug 2006 16:16:55 +0000 Subject: [PATCH] Fix bug 149179 - use Mutex to serialize parallel sftp requests where necessary --- .../eclipse/rse/services/ssh/files/Mutex.java | 148 ++++++++++++ .../services/ssh/files/SftpFileService.java | 227 ++++++++++-------- 2 files changed, 275 insertions(+), 100 deletions(-) create mode 100644 rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/Mutex.java diff --git a/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/Mutex.java b/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/Mutex.java new file mode 100644 index 00000000000..416c914b483 --- /dev/null +++ b/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/Mutex.java @@ -0,0 +1,148 @@ +/******************************************************************************* + * Copyright (c) 2006 Wind River Systems, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Martin Oberhuber (Wind River) - initial API and implementation + *******************************************************************************/ + +package org.eclipse.rse.services.ssh.files; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.eclipse.core.runtime.IProgressMonitor; + +import org.eclipse.rse.services.ssh.Activator; + +/** + * A Mutual Exclusion Lock for Threads that need to access a resource + * in a serialized manner. + * + * Usage Example: + * + * private Mutex fooMutex; + * boolean doFooSerialized()(IProgressMonitor monitor) { + * if (fooMutex.waitForLock(monitor, 1000)) { + * try { + * return doFoo(); + * } finally { + * fooMutex.release(); + * } + * } + * return false; + * } + * + */ +public class Mutex { + + private boolean fLocked = false; + private List fWaitQueue = new LinkedList(); + + /** + * Try to acquire the lock maintained by this mutex. + * + * If the thread needs to wait before it can acquire the mutex, it + * will wait in a first-come-first-serve fashion. In case a progress + * monitor was given, it will be updated and checked for cancel every + * second. + * + * @param monitor Eclipse Progress Monitor. May be null. + * @param timeout Maximum wait time given in milliseconds. + * @return true if the lock was obtained successfully. + */ + public synchronized boolean waitForLock(IProgressMonitor monitor, long timeout) { + if (Thread.interrupted()) { + return false; + } + if (fLocked) { + //need to wait for the lock. + boolean canceled = false; + final Thread myself = Thread.currentThread(); + try { + fWaitQueue.add(myself); + Activator.trace("Mutex: added "+myself+", size="+fWaitQueue.size()); //$NON-NLS-1$ //$NON-NLS-2$ + long start = System.currentTimeMillis(); + long timeLeft = timeout; + long pollTime = (monitor!=null) ? 1000 : timeLeft; + long nextProgressUpdate = start+500; + while (timeLeft>0 && !canceled) { + try { + wait(timeLeft > pollTime ? pollTime : timeLeft); + Activator.trace("Mutex: wakeup "+myself+" ?"); //$NON-NLS-1$ //$NON-NLS-2$ + //I'm still in the list, nobody is allowed to take me out! + assert !fWaitQueue.isEmpty(); + if (!fLocked && fWaitQueue.get(0) == myself) { + break; //gee it's my turn! + } + long curTime = System.currentTimeMillis(); + timeLeft = start + timeout - curTime; + if (monitor!=null) { + canceled = monitor.isCanceled(); + if (!canceled && (curTime>nextProgressUpdate)) { + monitor.worked(1); + nextProgressUpdate+=1000; + } + } + } catch(InterruptedException e) { + canceled = true; + } + } + } finally { + fWaitQueue.remove(myself); + Activator.trace("Mutex: removed "+myself+", size="+fWaitQueue.size()); //$NON-NLS-1$ //$NON-NLS-2$ + } + if (fLocked || canceled) { + //we were not able to acquire the lock due to an exception, + //or because the wait was canceled. + return false; + } + } + //acquire the lock myself now. + fLocked = true; + return true; + } + + /** + * Release this mutex's lock. + * + * May only be called by the same thread that originally acquired + * the Mutex. + */ + public synchronized void release() { + fLocked=false; + if (!fWaitQueue.isEmpty()) { + Object nextOneInQueue = fWaitQueue.get(0); + Activator.trace("Mutex: releasing "+nextOneInQueue); //$NON-NLS-1$ + notifyAll(); + } + } + + /** + * Return this Mutex's lock status. + * @return true if this mutex is currently acquired by a thread. + */ + public synchronized boolean isLocked() { + return fLocked; + } + + /** + * Interrupt all threads waiting for the Lock, causing their + * {@link #waitForLock(IProgressMonitor, long)} method to return + * false. + * This should be called if the resource that the Threads are + * contending for, becomes unavailable for some other reason. + */ + public void interruptAll() { + Iterator it = fWaitQueue.iterator(); + while (it.hasNext()) { + Thread aThread = (Thread)it.next(); + aThread.interrupt(); + } + } + +} diff --git a/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/SftpFileService.java b/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/SftpFileService.java index 024f3fa6c3f..9e717c50281 100644 --- a/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/SftpFileService.java +++ b/rse/plugins/org.eclipse.rse.services.ssh/src/org/eclipse/rse/services/ssh/files/SftpFileService.java @@ -49,6 +49,8 @@ public class SftpFileService extends AbstractFileService implements IFileService private ISshSessionProvider fSessionProvider; private ChannelSftp fChannelSftp; private String fUserHome; + private Mutex fDirChannelMutex = new Mutex(); + private long fDirChannelTimeout = 5000; //max.5 seconds to obtain dir channel // public SftpFileService(SshConnectorService conn) { // fConnector = conn; @@ -130,6 +132,7 @@ public class SftpFileService extends AbstractFileService implements IFileService if (fChannelSftp!=null && fChannelSftp.isConnected()) { fChannelSftp.disconnect(); } + fDirChannelMutex.interruptAll(); fChannelSftp = null; } @@ -140,11 +143,15 @@ public class SftpFileService extends AbstractFileService implements IFileService //the API docs. SftpHostFile node = null; SftpATTRS attrs = null; - try { - attrs = getChannel("SftpFileService.getFile").stat(remoteParent+'/'+fileName); //$NON-NLS-1$ - Activator.trace("SftpFileService.getFile done"); //$NON-NLS-1$ - } catch(Exception e) { - Activator.trace("SftpFileService.getFile failed: "+e.toString()); //$NON-NLS-1$ + if (fDirChannelMutex.waitForLock(monitor, fDirChannelTimeout)) { + try { + attrs = getChannel("SftpFileService.getFile").stat(remoteParent+'/'+fileName); //$NON-NLS-1$ + Activator.trace("SftpFileService.getFile done"); //$NON-NLS-1$ + } catch(Exception e) { + Activator.trace("SftpFileService.getFile failed: "+e.toString()); //$NON-NLS-1$ + } finally { + fDirChannelMutex.release(); + } } if (attrs!=null) { node = makeHostFile(remoteParent, fileName, attrs); @@ -171,48 +178,52 @@ public class SftpFileService extends AbstractFileService implements IFileService } NamePatternMatcher filematcher = new NamePatternMatcher(fileFilter, true, true); List results = new ArrayList(); - try { - java.util.Vector vv=getChannel("SftpFileService.internalFetch").ls(parentPath); //$NON-NLS-1$ - for(int ii=0; ii