[teiid-commits] teiid SVN: r664 - in trunk: common-internal/src/main/java/com/metamatrix/common/queue and 13 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Mar 30 16:55:34 EDT 2009


Author: shawkins
Date: 2009-03-30 16:55:34 -0400 (Mon, 30 Mar 2009)
New Revision: 664

Modified:
   trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java
   trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
   trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
   trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties
   trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
   trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java
   trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
   trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
   trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties
   trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
Log:
TEIID-445, TEIID-386 cleanup of workerpool and logging of expected user exceptions.

Modified: trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.MissingResourceException;
 import java.util.ResourceBundle;
+
 import com.metamatrix.core.util.ArgCheck;
 import com.metamatrix.core.util.StringUtil;
 
@@ -186,12 +187,8 @@
      */
     public String getString(final String key,
                             final Object... parameters) {
-        String text = getProductValue(key);
+    	String text = getString(key);
 
-        if (text == null) {
-            text = getString(key);
-        }
-
         // Check the trivial cases ...
         if (text == null) {
             return '<' + key + '>';
@@ -223,5 +220,4 @@
         return value;
     }
 
-
 }

Modified: trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
===================================================================
--- trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -24,6 +24,7 @@
 
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 public interface WorkerPool extends Executor {
@@ -32,7 +33,7 @@
 	
 	List<Runnable> shutdownNow();
 
-	void awaitTermination(long timeout, TimeUnit unit)
+	boolean awaitTermination(long timeout, TimeUnit unit)
 			throws InterruptedException;
 
 	boolean isTerminated();
@@ -40,5 +41,16 @@
 	WorkerPoolStats getStats();
 	
 	boolean hasWork();
+	
+	int getPoolSize();
+	
+	ScheduledFuture<?> schedule(Runnable command,
+            long delay,
+            TimeUnit unit);
+	
+	ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+            long initialDelay,
+            long period,
+            TimeUnit unit);
 
 }
\ No newline at end of file

Modified: trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
===================================================================
--- trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -22,12 +22,19 @@
 
 package com.metamatrix.common.queue;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -39,208 +46,320 @@
 import com.metamatrix.core.util.NamedThreadFactory;
 
 /**
- * Creates WorkPools based upon {@link ThreadPoolExecutor}
+ * Creates named, queued, daemon Thread pools.
+ * <br/>
+ * The requirements are:
+ * <ol>
+ * <li>minimize thread creation</li>
+ * <li>allow for proper timeout of idle threads</li>
+ * <li>allow for queuing</li>
+ * </ol>
+ * <br/>
+ * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies 1 and 2, but not 3.
+ * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will tend to create 
+ * up to the maximum number of threads and makes no guarantee on thread scheduling.
+ * <br/>
+ * So the approach here is to use virtual thread pools off of single shared {@link SynchronousQueue}
+ * backed {@link ThreadPoolExecutor}.
+ * <br/>
+ * There is also only a single master scheduling thread with actual executions deferred to the calling
+ * WorkerPool.
+ * 
+ * TODO: this probably needs to be re-thought, especially since the lifo ordering of a {@link SynchronousQueue} 
+ * is not guaranteed behavior.  also there's a race condition between previously retiring threads and new work - 
+ * prior to being returned to the shared pool we can create extra threads if the shared pool is exhausted.
+ * TODO: bounded queuing - we never bothered bounding in the past with our worker pools, but reasonable
+ * defaults would be a good idea.
  */
 public class WorkerPoolFactory {
 
+	private static ThreadPoolExecutor tpe = new ThreadPoolExecutor(0,
+			Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
+			new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) { //$NON-NLS-1$ 
+		@Override
+		protected void afterExecute(Runnable r, Throwable t) {
+			if (t != null) {
+				LogManager.logError(LogCommonConstants.CTX_POOLING, t, CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+			}
+		}
+	}; 
+	
+	private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+	
 	/**
-	 * Attempts to detect when new threads should be created without blocking.
-	 * 
-	 * IMPORTANT NOTE: actual execution ordering is not guaranteed.
+	 * TODO: purge user canceled scheduled tasks.
 	 */
-	static final class ThreadReuseLinkedBlockingQueue extends
-			LinkedBlockingQueue<Runnable> {
-		private StatsCapturingThreadPoolExecutor executor;
-
-		void setExecutor(StatsCapturingThreadPoolExecutor executor) {
-			this.executor = executor;
-		}
+	static class StatsCapturingSharedThreadPoolExecutor implements WorkerPool {
 		
-		@Override
-		public boolean offer(Runnable o) {
-			if (executor.getPoolSize() + executor.getCompletedCount() >= executor.getSubmittedCount()) {
-				/*
-				 * TODO: this strategy suffers from the same possible flaws as the previous implementation of
-				 * WorkerPool.  If the available threads are in the process of dying, we run the risk of
-				 * queuing without starting another thread (fortunately the ThreadPoolExecutor itself will 
-				 * try to mitigate this case as well).
-				 * 
-				 * Since this was never observed to be a problem before, we'll not initially worry about it with
-				 * this implementation.
-				 */
-				return super.offer(o);
+		class ScheduledFutureTask extends FutureTask<Void> implements ScheduledFuture<Void> {
+			private ScheduledFuture<?> scheduledFuture;
+			private boolean periodic;
+			private volatile boolean running;
+			
+			public ScheduledFutureTask(Runnable runnable, boolean periodic) {
+				super(runnable, null);
+				this.periodic = periodic;
 			}
-			return false; //trigger thread creation if possible
-		}
+			
+			public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
+				scheduledTasks.add(this);
+				this.scheduledFuture = scheduledFuture;
+			}
+			
+			@Override
+			public long getDelay(TimeUnit unit) {
+				return this.scheduledFuture.getDelay(unit);
+			}
 
-		@Override
-		public boolean add(Runnable arg0) {
-			if (super.offer(arg0)) {
-		        return true;
+			@Override
+			public int compareTo(Delayed o) {
+				return this.scheduledFuture.compareTo(o);
 			}
-		    throw new IllegalStateException("Queue full"); //$NON-NLS-1$
+			
+			@Override
+			public boolean cancel(boolean mayInterruptIfRunning) {
+				this.scheduledFuture.cancel(false);
+				return super.cancel(mayInterruptIfRunning);
+			}
+			
+			public Runnable getParent() {
+				return new Runnable() {
+					@Override
+					public void run() {
+						if (running || terminated) {
+							return;
+						}
+						running = periodic;
+						execute(ScheduledFutureTask.this);
+					}
+				};
+			}
+			
+			@Override
+			public void run() {
+				if (periodic) {
+					if (!this.runAndReset()) {
+						this.scheduledFuture.cancel(false);
+						scheduledTasks.remove(this);
+					}
+					running = false;
+				} else {
+					scheduledTasks.remove(this);
+					super.run();
+				}
+			}
 		}
-	}
-
-	static class StatsCapturingThreadPoolExecutor extends ThreadPoolExecutor {
 		
-		private AtomicInteger activeCount = new AtomicInteger(0);
-		private AtomicInteger submittedCount = new AtomicInteger(0);
-		private AtomicInteger completedCount = new AtomicInteger(0);
+		private volatile int activeCount;
+		private volatile int maxActiveCount;
+		private volatile int maxQueueSize;
+		private volatile boolean terminated;
+		private volatile int submittedCount;
+		private volatile int completedCount;
+		private Object poolLock = new Object();
+		private AtomicInteger threadCounter = new AtomicInteger();
+		private Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread, Boolean>()));
+		private Set<ScheduledFutureTask> scheduledTasks = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<ScheduledFutureTask, Boolean>()));
 		
-		public StatsCapturingThreadPoolExecutor(int corePoolSize,
-                              int maximumPoolSize,
-                              long keepAliveTime,
-                              TimeUnit unit,
-                              BlockingQueue<Runnable> workQueue,
-                              ThreadFactory threadFactory) {
-			super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
-		}
+		private String poolName;
+		private int maximumPoolSize;
+		private Queue<Runnable> queue = new LinkedList<Runnable>();
 		
-		@Override
-		protected void beforeExecute(Thread t, Runnable r) {
-			activeCount.getAndIncrement();
+		public StatsCapturingSharedThreadPoolExecutor(String name, int maximumPoolSize) {
+			this.maximumPoolSize = maximumPoolSize;
+			this.poolName = name;
 		}
 		
 		@Override
-		protected void afterExecute(Runnable r, Throwable t) {
-			if (t != null) {
-				LogManager.logError(LogCommonConstants.CTX_POOLING, t, CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+		public void execute(final Runnable command) {
+			boolean atMaxThreads = false;
+			boolean newMaxQueueSize = false;
+			synchronized (poolLock) {
+				checkForTermination();
+				submittedCount++;
+				atMaxThreads = activeCount == maximumPoolSize;
+				if (atMaxThreads) {
+					queue.add(command);
+					int queueSize = queue.size();
+					if (queueSize > maxQueueSize) {
+						atMaxThreads = true;
+						maxQueueSize = queueSize;
+					}
+				} else {
+					activeCount++;
+					maxActiveCount = Math.max(activeCount, maxActiveCount);
+				}
 			}
-			activeCount.getAndDecrement();
-			completedCount.getAndIncrement();
+			if (atMaxThreads) {
+				if (newMaxQueueSize && maximumPoolSize > 1) {
+					LogManager.logWarning(LogCommonConstants.CTX_POOLING, CommonPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName, maxQueueSize)); //$NON-NLS-1$
+				}
+				return;
+			}
+			tpe.execute(new Runnable() {
+				@Override
+				public void run() {
+					Thread t = Thread.currentThread();
+					threads.add(t);
+					String name = t.getName();
+					t.setName(name + "_" + poolName + threadCounter.getAndIncrement()); //$NON-NLS-1$
+					if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_POOLING, MessageLevel.TRACE)) {
+						LogManager.logTrace(LogCommonConstants.CTX_POOLING, "Beginning work with virtual worker", t.getName()); //$NON-NLS-1$ 
+					}
+					Runnable r = command;
+					while (r != null) {
+						boolean success = false;
+						try {
+							r.run();
+							success = true;
+						} finally {
+							synchronized (poolLock) {
+								if (success) {
+									completedCount++;
+									r = queue.poll();		
+								}
+								if (r != null) {
+									continue;
+								}
+								threads.remove(t);
+								activeCount--;
+								if (activeCount == 0 && terminated) {
+									poolLock.notifyAll();
+								}		
+							}
+							t.setName(name);
+						}
+					}
+				};
+			});
 		}
-		
-		@Override
-		public void execute(Runnable command) {
-			submittedCount.getAndIncrement();
-			super.execute(command);
+
+		private void checkForTermination() {
+			if (terminated) {
+				throw new RejectedExecutionException();
+			}
 		}
 		
-		@Override
 		public int getActiveCount() {
-			return activeCount.get();
+			return activeCount;
 		}
 		
 		public int getSubmittedCount() {
-			return submittedCount.get();
+			return submittedCount;
 		}
 		
 		public int getCompletedCount() {
-			return completedCount.get();
+			return completedCount;
 		}
 		
-	}
-	
-	public static class DefaultThreadFactory extends NamedThreadFactory {
+		public int getPoolSize() {
+			return maximumPoolSize;
+		}
 		
-		public DefaultThreadFactory(String name) {
-			super(name);
+		public boolean isTerminated() {
+			return terminated;
 		}
-
-		public Thread newThread(Runnable r) {
-			Thread result = super.newThread(r);
-			if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_POOLING, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogCommonConstants.CTX_POOLING, CommonPlugin.Util.getString("WorkerPool.New_thread", result.getName())); //$NON-NLS-1$
+		
+		public void shutdown() {
+			this.terminated = true;
+			synchronized (scheduledTasks) {
+				for (ScheduledFuture<?> future : scheduledTasks) {
+					future.cancel(false);
+				}
+				scheduledTasks.clear();
 			}
-			return result;
 		}
-	}
-	
-	static class ExecutorWorkerPool implements WorkerPool {
 		
-		private String name;
-		private StatsCapturingThreadPoolExecutor executor;
-
-		public ExecutorWorkerPool(final String name, StatsCapturingThreadPoolExecutor executor) {
-			this.name = name;
-			this.executor = executor;
+		public int getLargestPoolSize() {
+			return this.maxActiveCount;
 		}
 		
-		public void execute(Runnable r) {
-			this.executor.execute(r);
-		}
-
-		public void awaitTermination(long timeout, TimeUnit unit)
-				throws InterruptedException {
-			this.executor.awaitTermination(timeout, unit);
-		}
-
+		@Override
 		public WorkerPoolStats getStats() {
 			WorkerPoolStats stats = new WorkerPoolStats();
-			stats.name = name;
-			stats.queued = executor.getQueue().size();
-			stats.threads = executor.getPoolSize();
-			stats.activeThreads = executor.getActiveCount();
-			stats.totalSubmitted = executor.getSubmittedCount();
-			//TODO: highestActiveThreads is misleading for pools that prefer to use new threads
-			stats.highestActiveThreads = executor.getLargestPoolSize();
-			stats.totalCompleted = executor.getCompletedCount();
+			stats.name = poolName;
+			stats.queued = queue.size();
+			stats.threads = getPoolSize();
+			stats.activeThreads = getActiveCount();
+			stats.totalSubmitted = getSubmittedCount();
+			stats.highestActiveThreads = getLargestPoolSize();
+			stats.totalCompleted = getCompletedCount();
 			return stats;
 		}
-
-		public boolean isTerminated() {
-			return this.executor.isTerminated();
+		
+		@Override
+		public boolean hasWork() {
+			synchronized (poolLock) {
+				return this.getSubmittedCount() - this.getCompletedCount() > 0 && !this.isTerminated();
+			}
 		}
 
-		public void shutdown() {
-			this.executor.shutdown();
+		@Override
+		public List<Runnable> shutdownNow() {
+			this.shutdown();
+			synchronized (poolLock) {
+				synchronized (threads) {
+					for (Thread t : threads) {
+						t.interrupt();
+					}
+				}
+				List<Runnable> result = new ArrayList<Runnable>(queue);
+				queue.clear();
+				return result;
+			}
 		}
+		
+		@Override
+		public boolean awaitTermination(long timeout, TimeUnit unit)
+				throws InterruptedException {
+			long timeoutMillis = unit.toMillis(timeout);
+			long finalMillis = System.currentTimeMillis() + timeoutMillis;
+			synchronized (poolLock) {
+				while (this.activeCount > 0 || !terminated) {
+					if (timeoutMillis < 1) {
+						return false;
+					}
+					poolLock.wait(timeoutMillis);
+					timeoutMillis = finalMillis - System.currentTimeMillis();
+				}
+			}
+			return true;
+		}
 
-		public boolean hasWork() {
-			return this.executor.getSubmittedCount() - this.executor.getCompletedCount() > 0 && !this.executor.isTerminated();
+		@Override
+		public ScheduledFuture<?> schedule(final Runnable command, long delay,
+				TimeUnit unit) {
+			checkForTermination();
+			ScheduledFutureTask sft = new ScheduledFutureTask(command, false);
+			synchronized (scheduledTasks) {
+				ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
+				sft.setScheduledFuture(future);
+				return sft;
+			}
 		}
 
 		@Override
-		public List<Runnable> shutdownNow() {
-			return this.executor.shutdownNow();
+		public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+				long initialDelay, long period, TimeUnit unit) {
+			checkForTermination();
+			ScheduledFutureTask sft = new ScheduledFutureTask(command, true);
+			synchronized (scheduledTasks) {
+				ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(), initialDelay, period, unit);
+				sft.setScheduledFuture(future);
+				return sft;
+			}		
 		}
 	}
-	
+			
 	/**
 	 * Creates a WorkerPool that prefers thread reuse over thread creation based upon the given parameters
 	 * 
 	 * @param name
 	 * @param numThreads the maximum number of worker threads allowed
-	 * @param keepAlive keepAlive time in milliseconds - NOT supported until JDK 1.6 for pools that don't prefer existing threads
 	 * @return 
 	 */
 	public static WorkerPool newWorkerPool(String name, int numThreads, long keepAlive) {
-		return newWorkerPool(name, numThreads, keepAlive, true);
+		return new StatsCapturingSharedThreadPoolExecutor(name, numThreads);
 	}
-	
-    public static WorkerPool newWorkerPool(String name, final int numThreads, long keepAlive, boolean preferExistingThreads) {
-		if (preferExistingThreads && numThreads > 1) {
-			final ThreadReuseLinkedBlockingQueue queue = new ThreadReuseLinkedBlockingQueue();
-			StatsCapturingThreadPoolExecutor executor = 
-				new StatsCapturingThreadPoolExecutor(0, numThreads, keepAlive, TimeUnit.MILLISECONDS, queue, new DefaultThreadFactory(name)) {
-				
-				@Override
-				public void execute(Runnable arg0) {
-					if (this.isShutdown()) {
-						//bypass the rejection handler
-						throw new RejectedExecutionException();
-					}
-					super.execute(arg0);
-				}
-				
-			};
-			queue.setExecutor(executor);
-			executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
-				public void rejectedExecution(Runnable arg0,
-						ThreadPoolExecutor arg1) {
-					try {
-						queue.add(arg0);
-					} catch (IllegalStateException e) {
-						throw new RejectedExecutionException(e);
-					}
-				}
-			});
-			return new ExecutorWorkerPool(name, executor);
-		}
-		StatsCapturingThreadPoolExecutor executor = new StatsCapturingThreadPoolExecutor(numThreads, numThreads, keepAlive, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(name));
-		executor.allowCoreThreadTimeOut(true);
-		return new ExecutorWorkerPool(name, executor);
-	}
-	
+    
 }

Modified: trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties
===================================================================
--- trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties	2009-03-30 20:55:34 UTC (rev 664)
@@ -3100,4 +3100,5 @@
 VDBDefnXMLHelper.Unable_to_read_defn_file=Unable to read DEF file.
 
 WorkerPool.uncaughtException=Uncaught exception in worker pool
-WorkerPool.New_thread=New work thread created with name {0}
\ No newline at end of file
+WorkerPool.New_thread=New work thread created with name {0}
+WorkerPool.Max_thread=Reached maximum thread count "{0}" for worker pool "{1}" with a queue size of "{2}".
\ No newline at end of file

Modified: trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
===================================================================
--- trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -22,24 +22,21 @@
 
 package com.metamatrix.common.queue;
 
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.*;
+import org.junit.Test;
 
 /**
  */
-public class TestQueueWorkerPool extends TestCase {
+public class TestQueueWorkerPool {
 
-    /**
-     * Constructor for TestQueueWorkerPool.
-     * @param arg0
-     */
-    public TestQueueWorkerPool(String arg0) {
-        super(arg0);
-    }
-    
-    public void testQueuing() throws Exception {
+    @Test public void testQueuing() throws Exception {
         final long SINGLE_WAIT = 50;
         final int WORK_ITEMS = 10;
         final int MAX_THREADS = 5;
@@ -58,7 +55,7 @@
         assertEquals("Expected threads to be maxed out", MAX_THREADS, stats.highestActiveThreads); //$NON-NLS-1$
     }
 
-    public void testThreadReuse() throws Exception {
+    @Test public void testThreadReuse() throws Exception {
         final long SINGLE_WAIT = 50;
         final long NUM_THREADS = 5;
 
@@ -81,15 +78,78 @@
         pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
     }
     
-    public void testShutdown() throws Exception {
+    @Test(expected=RejectedExecutionException.class) public void testShutdown() throws Exception {
     	final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5, 120000); //$NON-NLS-1$
         pool.shutdown();
-        try {
-        	pool.execute(new FakeWorkItem(1));
-        	fail("expected exception"); //$NON-NLS-1$
-        } catch (RejectedExecutionException e) {
-        	
-        }
+    	pool.execute(new FakeWorkItem(1));
     }
+    
+    @Test public void testScheduleCancel() throws Exception {
+    	final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5, 120000); //$NON-NLS-1$
+    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+    		@Override
+    		public void run() {
+    		}
+    	}, 0, 5, TimeUnit.MILLISECONDS);
+    	future.cancel(true);
+    	assertFalse(future.cancel(true));    	
+    }
+    
+    @Test public void testSchedule() throws Exception {
+    	final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5, 120000); //$NON-NLS-1$
+        final ArrayList<String> result = new ArrayList<String>(); 
+    	ScheduledFuture<?> future = pool.schedule(new Runnable() {
+    		@Override
+    		public void run() {
+    			result.add("hello"); //$NON-NLS-1$
+    		}
+    	}, 5, TimeUnit.MILLISECONDS);
+    	future.cancel(true);
+    	Thread.sleep(10);
+    	assertEquals(0, result.size());    
+    	future = pool.schedule(new Runnable() {
+    		@Override
+    		public void run() {
+    			result.add("hello"); //$NON-NLS-1$
+    		}
+    	}, 5, TimeUnit.MILLISECONDS);
+    	Thread.sleep(10);
+    	pool.shutdown();
+    	pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    	assertEquals(1, result.size());
+    }
+    
+    @Test(expected=ExecutionException.class) public void testScheduleException() throws Exception {
+    	final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5, 120000); //$NON-NLS-1$
+    	ScheduledFuture<?> future = pool.schedule(new Runnable() {
+    		@Override
+    		public void run() {
+    			throw new RuntimeException();
+    		}
+    	}, 0, TimeUnit.MILLISECONDS);
+    	future.get();
+    }
+    
+    /**
+     * Here each execution exceeds the period, so only half the number of executions are expected.
+     */
+    @Test public void testScheduleRepeated() throws Exception {
+    	final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5, 120000); //$NON-NLS-1$
+    	final ArrayList<String> result = new ArrayList<String>();
+    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+    		@Override
+    		public void run() {
+    			result.add("hello"); //$NON-NLS-1$
+    			try {
+					Thread.sleep(7);
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+    		}
+    	}, 0, 5, TimeUnit.MILLISECONDS);
+    	Thread.sleep(99);
+    	future.cancel(true);
+    	assertEquals(10, result.size());
+    }
         
 }

Modified: trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java
===================================================================
--- trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -24,6 +24,9 @@
 
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.teiid.connector.language.ILanguageFactory;
 
@@ -67,4 +70,20 @@
      * conversions supplied by the Connector API.
      */
     TypeFacility getTypeFacility();
+    
+    /**
+     * Schedule a command for repeated execution with the same contract as 
+     * {@link ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}
+     * Executions will not happen concurrently.  If an execution takes longer than a period,
+     * the next execution will take place on the first period interval after completion.
+     * @param command
+     * @param initialDelay
+     * @param period
+     * @param unit
+     * @return
+     */
+	ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+            long initialDelay,
+            long period,
+            TimeUnit unit);
 }

Modified: trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java
===================================================================
--- trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -21,13 +21,12 @@
  */
 package com.metamatrix.connector.salesforce;
 
+import junit.framework.TestCase;
+
 import org.teiid.connector.api.ConnectorEnvironment;
 import org.teiid.connector.api.ConnectorException;
 import org.teiid.connector.api.ExecutionContext;
 
-import junit.framework.TestCase;
-
-import com.metamatrix.connector.salesforce.connection.SalesforceConnection;
 import com.metamatrix.connector.salesforce.test.util.ObjectFactory;
 
 public class TestConnector extends TestCase {
@@ -53,12 +52,6 @@
 		noCredConnector.start(env2);
 	}
 
-	public void testGetConnection() throws Exception {
-		ExecutionContext secContext = ObjectFactory.getDefaultSecurityContext();
-		SalesforceConnection connection = (SalesforceConnection) connector.getConnection(secContext);
-		assertNotNull("the connection is null", connection);
-	}
-
 	/*
 	public void testGetConnectionTrustedToken() {
 		ExecutionContext secContext = TestObjectFactory.getTokenExecutionContext();
@@ -136,7 +129,6 @@
 
 	public void testGetState() {
 		assertNotNull(connector.getState());
-		assertTrue(connector.getState() instanceof ConnectorState);
 	}
 
 	public void testStopNoInit() {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -27,6 +27,8 @@
 package org.teiid.dqp.internal.datamgr.impl;
 
 import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.teiid.connector.api.ConnectorEnvironment;
 import org.teiid.connector.api.ConnectorLogger;
@@ -43,7 +45,28 @@
  */
 public class ConnectorEnvironmentImpl implements ConnectorEnvironment {
     
-    private static final TypeFacility TYPE_FACILITY = new TypeFacilityImpl();
+    private final class ContextClassLoaderPreservingRunnable implements
+			Runnable {
+		private final Runnable arg0;
+		private final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+		private ContextClassLoaderPreservingRunnable(Runnable arg0) {
+			this.arg0 = arg0;
+		}
+
+		@Override
+		public void run() {
+			ClassLoader current = Thread.currentThread().getContextClassLoader();
+			Thread.currentThread().setContextClassLoader(cl);
+			try {
+				arg0.run();
+			} finally {
+				Thread.currentThread().setContextClassLoader(current);
+			}
+		}
+	}
+
+	private static final TypeFacility TYPE_FACILITY = new TypeFacilityImpl();
     
     private ConnectorLogger logger;
     private Properties properties;
@@ -117,12 +140,20 @@
     }
 
 	@Override
-	public void execute(Runnable arg0) {
+	public void execute(Runnable command) {
 		if (this.workerPool != null) {
-			this.workerPool.execute(arg0);
+			this.workerPool.execute(new ContextClassLoaderPreservingRunnable(command));
 		} else {
-			arg0.run();
+			command.run();
 		}
-		
 	}               
+	
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+			long initialDelay, long period, TimeUnit unit) {
+		if (this.workerPool != null) {
+			return this.workerPool.scheduleAtFixedRate(new ContextClassLoaderPreservingRunnable(command), initialDelay, period, unit);
+		}
+		return null;
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -35,9 +35,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.XAResource;
 
@@ -123,9 +122,6 @@
     // known requests
     private ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem> requestStates = new ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem>();
 
-    // Lazily created and used for asynch query execution
-    private Timer timer;
-    
     private Properties props;
 	private ClassLoader classloader;
     
@@ -239,7 +235,7 @@
     	}
 	    workItem.requestClose();
     }
-        
+    
     /**
      * Schedule a task to be executed after the specified delay (in milliseconds) 
      * @param task The task to execute
@@ -247,17 +243,12 @@
      * @since 4.3.3
      */
     public void scheduleTask(final AsynchConnectorWorkItem state, long delay) {
-        synchronized(this) {
-            if(this.timer == null) {
-                this.timer = new Timer("AsynchRequestThread", true); //$NON-NLS-1$
-            }
-        }
-        
-        this.timer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				state.requestMore();
-			}}, delay);
+        this.connectorWorkerPool.schedule(new Runnable() {
+        	@Override
+        	public void run() {
+        		state.requestMore();
+        	}
+        }, delay, TimeUnit.MILLISECONDS);        
     }
     
     /**
@@ -294,7 +285,6 @@
     		throw new ApplicationLifecycleException("ConnectorManager.cannot_restart"); //$NON-NLS-1$
     	}
         connectorName = props.getProperty(ConnectorPropertyNames.CONNECTOR_BINDING_NAME, "Unknown_Binding_Name"); //$NON-NLS-1$
-
         String connIDStr = props.getProperty(ConnectorPropertyNames.CONNECTOR_ID);
         connectorID = new ConnectorID(connIDStr);
         
@@ -527,11 +517,6 @@
             
         }
         
-        if(this.timer != null) {
-            this.timer.cancel();
-            this.timer = null;
-        }
-        
         if (this.rsCache != null) {
         	this.rsCache.shutDown();
         	this.rsCache = null;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -32,13 +32,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.teiid.connector.DataPlugin;
 import org.teiid.connector.api.Connection;
+import org.teiid.connector.api.ConnectorEnvironment;
 import org.teiid.connector.api.ConnectorException;
 import org.teiid.connector.api.ConnectorIdentity;
 import org.teiid.connector.api.ExecutionContext;
@@ -122,8 +121,6 @@
     private Map<ConnectorIdentity, ConnectionsForId> idConnections = new HashMap<ConnectorIdentity, ConnectionsForId>();
     private Map<ConnectionWrapper, ConnectorIdentity> reverseIdConnections = new IdentityHashMap<ConnectionWrapper, ConnectorIdentity>();
 
-    private Timer cleaningThread;
-    
     private Object lock = new Object();
     
     private Semaphore poolSemaphore;
@@ -148,8 +145,9 @@
      * @param poolProperties Properties of the pool as defined in this class. May be empty, but may not be null.
      * @throws ConnectionPoolException if the connection pool fails to initialize.
      */
-    public void initialize(Properties poolProperties) throws ConnectionPoolException {
-        ArgCheck.isNotNull(poolProperties);
+    public void initialize(ConnectorEnvironment env) throws ConnectionPoolException {
+    	ArgCheck.isNotNull(env);
+    	Properties poolProperties = env.getProperties();
 
         maxConnections = PropertiesUtils.getIntProperty(poolProperties, MAX_CONNECTIONS, DEFAULT_MAX_CONNECTION);
         if (maxConnections < 1) {
@@ -170,13 +168,11 @@
         testConnectInterval = PropertiesUtils.getIntProperty(poolProperties, SOURCE_CONNECTION_TEST_INTERVAL, DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
         
         if (enableShrinking && !this.shuttingDownPool) {
-        	this.cleaningThread = new Timer("ConnectionPoolCleaningThread", true); //$NON-NLS-1$
-        	cleaningThread.schedule(new TimerTask() {
-				@Override
-				public void run() {
-					ConnectionPool.this.cleanUp(false);
-				}
-        	}, cleaningInterval, cleaningInterval);
+        	env.scheduleAtFixedRate(new Runnable() {
+        		public void run() {
+            		cleanUp(false);
+        		};
+        	}, cleaningInterval, cleaningInterval, TimeUnit.MILLISECONDS);
         }
 
         LogManager.logInfo(CTX_CONNECTOR, DataPlugin.Util.getString("ConnectionPool.Connection_pool_created_1")); //$NON-NLS-1$
@@ -216,7 +212,7 @@
         ConnectionsForId connLists = null;
 
         synchronized (this.lock) {
-            connLists = (ConnectionsForId) this.idConnections.get(id);
+            connLists = this.idConnections.get(id);
             if ( connLists == null ) {
                 connLists = new ConnectionsForId();
                 if (this.maxConnectionsForEachID < this.maxConnections) {
@@ -325,7 +321,7 @@
                     if (connsForId.unused.isEmpty()) {
                         continue;
                     }
-                    ConnectionWrapper conn = (ConnectionWrapper) connsForId.unused.removeFirst();
+                    ConnectionWrapper conn = connsForId.unused.removeFirst();
                     closeSourceConnection(conn, id);
                     break;
                 }
@@ -354,8 +350,8 @@
         ConnectionsForId connLists = null;
         ConnectorIdentity id = null;
         synchronized (this.lock) {
-            id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
-            connLists = (ConnectionsForId) this.idConnections.get(id);
+            id = this.reverseIdConnections.get(connection);
+            connLists = this.idConnections.get(id);
         }
             
         if (connLists == null) {
@@ -398,11 +394,6 @@
         
         shuttingDownPool = true;
         
-        //close cleaning thread
-        if (this.cleaningThread != null) {
-	        this.cleaningThread.cancel();
-    	}        
-        
         this.cleanUp(true);
     }
 
@@ -459,8 +450,8 @@
     	ConnectorIdentity id = null;
     	ConnectionsForId connLists = null;
     	synchronized (this.lock) {
-            id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
-            connLists = (ConnectionsForId) this.idConnections.get(id);
+            id = this.reverseIdConnections.get(connection);
+            connLists = this.idConnections.get(id);
 		}
         if ( connLists != null ) {
         	synchronized (connLists) {
@@ -475,8 +466,8 @@
     	ConnectorIdentity id = null;
     	ConnectionsForId connLists = null;
     	synchronized (this.lock) {
-            id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
-            connLists = (ConnectionsForId) this.idConnections.get(id);
+            id = this.reverseIdConnections.get(connection);
+            connLists = this.idConnections.get(id);
 		}
         if ( connLists != null ) {
         	synchronized (connLists) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -25,7 +25,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 import javax.transaction.RollbackException;
 import javax.transaction.Synchronization;
@@ -101,10 +100,9 @@
 	@Override
 	public void start(ConnectorEnvironment environment)
 			throws ConnectorException {
-		Properties p = environment.getProperties();
-		pool.initialize(p);
+		pool.initialize(environment);
 		if (xaPool != null) {
-			xaPool.initialize(p);
+			xaPool.initialize(environment);
 		}
 		super.start(environment);
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -196,9 +196,7 @@
 	protected void process() {
 		DQPWorkContext.setWorkContext(this.dqpWorkContext);
 		
-        if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-            LogManager.logDetail(LogConstants.CTX_DQP, "############# PW PROCESSING on " + requestID + " with state "+ state +" ###########"); //$NON-NLS-1$ //$NON-NLS-2$  //$NON-NLS-3$
-        }
+        LogManager.logDetail(LogConstants.CTX_DQP, "############# PW PROCESSING on", requestID, "with state", state, "###########"); //$NON-NLS-1$ //$NON-NLS-2$  //$NON-NLS-3$
         
         try {
             if (this.state == ProcessingState.NEW) {
@@ -217,32 +215,30 @@
             }                  	            
         } catch (BlockedOnMemoryException e) {
             moreWork(false);
-            if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-            	LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on " + requestID + " - reenqueueing for more processing due to lack of available memory ###########"); //$NON-NLS-1$ //$NON-NLS-2$
-            }
+        	LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on", requestID, "- reenqueueing for more processing due to lack of available memory ###########"); //$NON-NLS-1$ //$NON-NLS-2$
         } catch (BlockedException e) {
-            if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-                LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on " + requestID + " - processor blocked ###########"); //$NON-NLS-1$ //$NON-NLS-2$
-            }        
+            LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on", requestID, "- processor blocked ###########"); //$NON-NLS-1$ //$NON-NLS-2$
         } catch (Throwable e) {
+        	LogManager.logDetail(LogConstants.CTX_DQP, e, "############# PW EXITING on", requestID, "- error occurred ###########"); //$NON-NLS-1$ //$NON-NLS-2$
             //if there is a cache, remove temp results if there is any
             if(this.rsCache != null){
             	rsCache.removeTempResults(cid);
             }
             
-            if (isCanceled()) {
-            	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-            		LogManager.logDetail(LogConstants.CTX_DQP, e, "############# PW EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$ //$NON-NLS-2$
-            	}
-            } else {
+            if (!isCanceled()) {
             	logCommandError();
                 //Case 5558: Differentiate between system level errors and
                 //processing errors.  Only log system level errors as errors, 
                 //log the processing errors as warnings only
                 if(e instanceof MetaMatrixProcessingException) {                          
-                    LogManager.logWarning(LogConstants.CTX_DQP, e, "############# PW EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$ //$NON-NLS-2$
+                	Throwable cause = e;
+                	while (cause.getCause() != null) {
+                		cause = e.getCause();
+                	}
+                	StackTraceElement elem = cause.getStackTrace()[0];
+                    LogManager.logWarning(LogConstants.CTX_DQP, DQPPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(), requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
                 }else {
-                    LogManager.logError(LogConstants.CTX_DQP, e, "############# PW EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$ //$NON-NLS-2$
+                    LogManager.logError(LogConstants.CTX_DQP, e, DQPPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
                 }                                
             }
             
@@ -536,10 +532,7 @@
 	}
 
     private void sendError() {
-    	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-    		LogManager.logDetail(LogConstants.CTX_DQP, processingException, DQPPlugin.Util.getString("ProcessWorker.send_error", requestID)); //$NON-NLS-1$
-    	}
-        
+		LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sedning error to client", requestID); //$NON-NLS-1$
         ResultsMessage response = new ResultsMessage(requestMsg);
         response.setException(processingException);
         setAnalysisRecords(response, analysisRecord);

Modified: trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties	2009-03-30 20:55:34 UTC (rev 664)
@@ -151,6 +151,8 @@
 ProcessWorker.Failed_to_deliver_response_for_{0}=Failed to deliver response for {0}.  Client connection may have been closed.
 ProcessWorker.ProcessWorker.Failed_starting_processing._1=ProcessWorker.Failed_starting_processing.
 ProcessWorker.failed_rollback=Failed to properly rollback autowrap transaction properly
+ProcessWorker.error=Unexpected exception for request {0}
+ProcessWorker.processing_error=Processing exception ''{0}'' for request {1}.  Exception type {2} throw from {3}. Enable detail logging to see the entire stacktrace.
 
 SharedCacheFinder.Didnt_find_caps=Unable to find capabilities for {0}
 

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -22,31 +22,42 @@
 
 package org.teiid.dqp.internal.pooling.connector;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.teiid.connector.api.ConnectorEnvironment;
+import org.teiid.connector.api.ConnectorLogger;
 import org.teiid.connector.api.ExecutionContext;
 import org.teiid.connector.api.MappedUserIdentity;
+import org.teiid.dqp.internal.datamgr.impl.ConnectorEnvironmentImpl;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWrapper;
 import org.teiid.dqp.internal.datamgr.impl.ExecutionContextImpl;
-import org.teiid.dqp.internal.pooling.connector.ConnectionPool;
-import org.teiid.dqp.internal.pooling.connector.ConnectionPoolException;
-import org.teiid.dqp.internal.pooling.connector.ConnectionWrapper;
 
-import junit.framework.TestCase;
+import com.metamatrix.common.application.ApplicationEnvironment;
+import com.metamatrix.common.queue.WorkerPool;
+import com.metamatrix.common.queue.WorkerPoolFactory;
 
 
-public class TestConnectionPool extends TestCase{
+public class TestConnectionPool {
     private ConnectionPool userIDPool;
     private ConnectionPool singleIDPool;
     private static ArrayList<Exception> EXCEPTIONS = new ArrayList<Exception>();
-        
-    public TestConnectionPool(String name) {
-        super(name);
+    private static WorkerPool pool = WorkerPoolFactory.newWorkerPool(TestConnectionPool.class.getSimpleName(), 1, 1000);
+    
+    @AfterClass public static void tearDownOnce() {
+    	pool.shutdownNow();
     }
     
-    public void setUp() throws Exception{
+    @Before public void setUp() throws Exception{
         FakeSourceConnectionFactory.alive = true;        
         
         singleIDPool = new ConnectionPool(new ConnectorWrapper(new FakeSourceConnectionFactory()));
@@ -58,12 +69,17 @@
         poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1"); //$NON-NLS-1$
-        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
-        
-        singleIDPool.initialize(poolProperties);
-        userIDPool.initialize(poolProperties);
+        ConnectorEnvironment env = createConnectorEnvironment(poolProperties);
+        singleIDPool.initialize(env);
+        userIDPool.initialize(env);
     }
+
+	private ConnectorEnvironment createConnectorEnvironment(
+			Properties poolProperties) {
+		ConnectorEnvironment env = new ConnectorEnvironmentImpl(poolProperties, Mockito.mock(ConnectorLogger.class), new ApplicationEnvironment(), pool);
+		return env;
+	}
     
     public void tearDown() throws Exception{
         singleIDPool.shutDown();
@@ -79,7 +95,7 @@
     }
     
     //=== tests ===//
-    public void testPoolUsingSingleIdentity() throws Exception {
+    @Test public void testPoolUsingSingleIdentity() throws Exception {
         ExecutionContext context = createContext("x", false);//$NON-NLS-1$
 
         ConnectionWrapper conn1 = singleIDPool.obtain(context);
@@ -99,7 +115,7 @@
         assertEquals(3, singleIDPool.getTotalConnectionCount());        
     }
 
-    public void testMaxConnectionTest() throws Exception {
+    @Test public void testMaxConnectionTest() throws Exception {
         ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new FakeSourceConnectionFactory()));
         
         Properties poolProperties = new Properties();
@@ -108,18 +124,17 @@
         poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1"); //$NON-NLS-1$
-        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
         
         try {
-            pool.initialize(poolProperties);
+            pool.initialize(createConnectorEnvironment(poolProperties));
             fail("should have failed to use 0 as max connections"); //$NON-NLS-1$
         }catch (ConnectionPoolException e) {
             // pass
         }
     }
     
-    public void testMaxConnectionTest1() throws Exception {
+    @Test public void testMaxConnectionTest1() throws Exception {
         ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new FakeSourceConnectionFactory()));
         
         Properties poolProperties = new Properties();
@@ -128,18 +143,17 @@
         poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1"); //$NON-NLS-1$
-        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
         
         try {
-            pool.initialize(poolProperties);
+            pool.initialize(createConnectorEnvironment(poolProperties));
             fail("should have failed to use 0 as max connections"); //$NON-NLS-1$
         }catch (ConnectionPoolException e) {
             // pass
         }
     }
     
-    public void testMaxConnectionTest2() throws Exception {
+    @Test public void testMaxConnectionTest2() throws Exception {
         ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new FakeSourceConnectionFactory()));
         
         Properties poolProperties = new Properties();
@@ -148,18 +162,17 @@
         poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1"); //$NON-NLS-1$
-        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
         
         try {
-            pool.initialize(poolProperties);
+            pool.initialize(createConnectorEnvironment(poolProperties));
             fail("should have failed to use 0 as max connections"); //$NON-NLS-1$
         }catch (ConnectionPoolException e) {
             // pass
         }
     }
 
-    public void testMessageWhenPoolMaxedOutPerIdentity() throws Exception {
+    @Test public void testMessageWhenPoolMaxedOutPerIdentity() throws Exception {
         ExecutionContext context = createContext("x", false);//$NON-NLS-1$
 
         // Max out the pool - 5 connections for same ID
@@ -176,7 +189,7 @@
         }
     }
     
-    public void testMessageWhenPoolTimedOut() throws Exception {
+    @Test public void testMessageWhenPoolTimedOut() throws Exception {
         FakeSourceConnectionFactory.alive = true;        
         
         singleIDPool = new ConnectionPool(new ConnectorWrapper(new FakeSourceConnectionFactory()));
@@ -186,9 +199,9 @@
         poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "1"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1"); //$NON-NLS-1$
-        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true"); //$NON-NLS-1$
+        poolProperties.put(ConnectionPool.ENABLE_SHRINKING, Boolean.FALSE.toString());
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
-        singleIDPool.initialize(poolProperties);
+        singleIDPool.initialize(createConnectorEnvironment(poolProperties));
         
         ExecutionContext context = createContext("x", false);//$NON-NLS-1$
 
@@ -204,7 +217,7 @@
         }
     }
 
-    public void testPoolUsingUserIdentity() throws Exception {
+    @Test public void testPoolUsingUserIdentity() throws Exception {
         ExecutionContext context1 = createContext("Jack", true); //$NON-NLS-1$
         ExecutionContext context2 = createContext("Tom", true); //$NON-NLS-1$
         userIDPool.obtain(context1);
@@ -223,7 +236,7 @@
         assertEquals(3, userIDPool.getTotalConnectionCount());        
     }
     
-    public void testPoolCleanUp() throws Exception {
+    @Test public void testPoolCleanUp() throws Exception {
         ExecutionContext context = createContext("x", false);       //$NON-NLS-1$ 
 
         ConnectionWrapper conn1 = singleIDPool.obtain(context);
@@ -247,7 +260,7 @@
         assertEquals(2, singleIDPool.getTotalConnectionCount());           
     }
     
-    public void testMultiThreading() throws Exception {
+    @Test public void testMultiThreading() throws Exception {
     	EXCEPTIONS.clear();
         int runnerNumber = 20;
         int connPerRunner = 20;
@@ -316,7 +329,7 @@
         }
     }
     
-    public void testMaxWithUserPool() throws Exception {
+    @Test public void testMaxWithUserPool() throws Exception {
         userIDPool = new ConnectionPool(new ConnectorWrapper(new FakeUserIdentityConnectionFactory()));   
         Properties poolProperties = new Properties();
         poolProperties.put(ConnectionPool.MAX_CONNECTIONS, "1"); //$NON-NLS-1$
@@ -326,7 +339,7 @@
         poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1000"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "false"); //$NON-NLS-1$
         poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL, "-1"); //$NON-NLS-1$
-        userIDPool.initialize(poolProperties);
+        userIDPool.initialize(createConnectorEnvironment(poolProperties));
         
         ConnectionWrapper conn = userIDPool.obtain(createContext("x", true)); //$NON-NLS-1$
         userIDPool.release(conn,false);

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -34,8 +34,6 @@
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.messaging.MessageBus;
 import com.metamatrix.common.net.ServerSocketConfiguration;
-import com.metamatrix.common.queue.WorkerPool;
-import com.metamatrix.common.queue.WorkerPoolFactory;
 import com.metamatrix.common.queue.WorkerPoolStats;
 import com.metamatrix.common.util.LogCommonConstants;
 import com.metamatrix.common.util.PropertiesUtils;
@@ -43,9 +41,9 @@
 import com.metamatrix.platform.PlatformPlugin;
 import com.metamatrix.platform.registry.ClusteredRegistryState;
 import com.metamatrix.platform.util.PlatformProxyHelper;
+import com.metamatrix.platform.vm.controller.ProcessController;
 import com.metamatrix.platform.vm.controller.ServerEvents;
 import com.metamatrix.platform.vm.controller.SocketListenerStats;
-import com.metamatrix.platform.vm.controller.ProcessController;
 import com.metamatrix.server.Configuration;
 import com.metamatrix.server.HostManagement;
 import com.metamatrix.server.Main;
@@ -60,20 +58,16 @@
 	
     private static final String SERVER_PORT = VMComponentDefnType.SERVER_PORT; 
     private static final String MAX_THREADS = VMComponentDefnType.MAX_THREADS; 
-    private static final String TIMETOLIVE = VMComponentDefnType.TIMETOLIVE;      
     private static final String INPUT_BUFFER_SIZE = VMComponentDefnType.INPUT_BUFFER_SIZE;       
     private static final String OUTPUT_BUFFER_SIZE = VMComponentDefnType.OUTPUT_BUFFER_SIZE;       
     
     private static final int DEFAULT_SERVER_PORT = 31000;
     private static final int DEFAULT_MAX_THREADS = 15;
-    private static final long DEFAULT_TIMETOLIVE = 15000;
     private static final long DEFAULT_WAITFORSERVICES = 500;
     private static final int DEFAULT_INPUT_BUFFER_SIZE = 0;
     private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 0;
 
-    private static final String SOCKET_WORKER_POOL_NAME = "SocketWorkerQueue"; //$NON-NLS-1$
     private SocketListener listener; 
-    private WorkerPool workerPool;
         
     @Inject
     public SocketVMController(@Named(Configuration.HOST) Host host, @Named(Configuration.PROCESSNAME) String processName, ClusteredRegistryState registry, ServerEvents serverEvents, MessageBus bus, HostManagement hostManagement) throws Exception {
@@ -94,17 +88,6 @@
     // so that the port can be made available sooner on bounces.
     @Override
     public void shutdown(boolean now) {
-        if (workerPool != null) {
-            try {
-                workerPool.shutdownNow();                
-            } catch (Exception e) {
-                //ignore
-            } finally {
-                workerPool = null;
-            }
-           
-        }
-        
         if (listener != null) {
             try {
                 listener.stop();
@@ -128,7 +111,6 @@
         Properties props = getProperties();
         int socketPort = PropertiesUtils.getIntProperty(props, SERVER_PORT, DEFAULT_SERVER_PORT);
         int maxThreads = PropertiesUtils.getIntProperty(props, MAX_THREADS, DEFAULT_MAX_THREADS);
-        long timeToLive = PropertiesUtils.getLongProperty(props, TIMETOLIVE, DEFAULT_TIMETOLIVE);
         int inputBufferSize = PropertiesUtils.getIntProperty(props, INPUT_BUFFER_SIZE, DEFAULT_INPUT_BUFFER_SIZE);
         int outputBufferSize = PropertiesUtils.getIntProperty(props, OUTPUT_BUFFER_SIZE, DEFAULT_OUTPUT_BUFFER_SIZE);
         String bindaddress =  VMNaming.getBindAddress();
@@ -138,11 +120,10 @@
         };
         
         logMessage(PlatformPlugin.Util.getString("SocketVMController.1", param)); //$NON-NLS-1$
-        workerPool = WorkerPoolFactory.newWorkerPool(SOCKET_WORKER_POOL_NAME, maxThreads, timeToLive);
         ServerSocketConfiguration helper = new ServerSocketConfiguration();
         try {
 	        helper.init();
-	        listener = new SocketListener(socketPort, bindaddress, this.clientServices, inputBufferSize, outputBufferSize, workerPool, helper.getServerSSLEngine(), helper.isClientEncryptionEnabled(), PlatformProxyHelper.getSessionServiceProxy(PlatformProxyHelper.ROUND_ROBIN_LOCAL));
+	        listener = new SocketListener(socketPort, bindaddress, this.clientServices, inputBufferSize, outputBufferSize, maxThreads, helper.getServerSSLEngine(), helper.isClientEncryptionEnabled(), PlatformProxyHelper.getSessionServiceProxy(PlatformProxyHelper.ROUND_ROBIN_LOCAL));
         } catch (Exception e) {
         	LogManager.logCritical(LogCommonConstants.CTX_CONTROLLER, e, PlatformPlugin.Util.getString("SocketVMController.2",param)); //$NON-NLS-1$
             System.exit(1); 
@@ -182,10 +163,7 @@
     }    
 
     protected WorkerPoolStats getProcessPoolStats() {
-        if (workerPool == null) {
-            return null;
-        }
-        return workerPool.getStats();
+    	return this.listener.getProcessPoolStats();
     }
     
     @Deprecated

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -34,7 +34,6 @@
 
 import org.teiid.dqp.internal.process.DQPWorkContext;
 
-import com.metamatrix.admin.api.exception.AdminException;
 import com.metamatrix.admin.api.exception.AdminProcessingException;
 import com.metamatrix.api.exception.ComponentNotFoundException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -77,9 +76,6 @@
 	 */
 	public void run() {
 		DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
-		if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT, MessageLevel.DETAIL)) {
-			LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "forwarding message to listener:" + message); //$NON-NLS-1$
-		}
 		Message result = null;
 		String service = null;
 		final boolean encrypt = message.getContents() instanceof SealedObject;
@@ -162,20 +158,25 @@
 		// Case 5558: Differentiate between system level errors and
 		// processing errors. Only log system level errors as errors,
 		// log the processing errors as warnings only
-		String msg = PlatformPlugin.Util.getString("ServerWorkItem.Received_exception_processing_request"); //$NON-NLS-1$
 		if (e instanceof MetaMatrixProcessingException) {
-			LogManager.logWarning(context, e, msg);
+        	logProcessingException(e, context);
 		} else if (e instanceof AdminProcessingException) {
-			LogManager.logWarning(context, e, msg);
+			logProcessingException(e, context);
 		} else {
-			LogManager.logError(context, e, msg);
+			LogManager.logError(context, e, PlatformPlugin.Util.getString("ServerWorkItem.Received_exception_processing_request", this.socketClientInstance.getWorkContext().getConnectionID())); //$NON-NLS-1$
 		}
-		
-		if (e instanceof AdminException) {
-			return e;
-		}
 
 		return new ExceptionHolder(e);
 	}
+	
+	private void logProcessingException(Throwable e, String context) {
+		Throwable cause = e;
+		while (cause.getCause() != null) {
+			cause = e.getCause();
+		}
+		StackTraceElement elem = cause.getStackTrace()[0];
+		LogManager.logDetail(context, e, "Processing exception for session", this.socketClientInstance.getWorkContext().getConnectionID()); //$NON-NLS-1$ 
+		LogManager.logWarning(context, PlatformPlugin.Util.getString("ServerWorkItem.processing_error", e.getMessage(), this.socketClientInstance.getWorkContext().getConnectionID(), e.getClass().getName(), elem)); //$NON-NLS-1$
+	}
 
 }
\ No newline at end of file

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -37,7 +37,6 @@
 import com.metamatrix.common.comm.platform.socket.ObjectChannel;
 import com.metamatrix.common.comm.platform.socket.SocketVMController;
 import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.queue.WorkerPool;
 import com.metamatrix.common.util.crypto.CryptoException;
 import com.metamatrix.common.util.crypto.Cryptor;
 import com.metamatrix.common.util.crypto.DhKeyGenerator;
@@ -56,7 +55,6 @@
 public class SocketClientInstance implements ChannelListener, ClientInstance {
 	
 	private final ObjectChannel objectSocket;
-    private final WorkerPool workerPool;
     private final ClientServiceRegistry server;
     private Cryptor cryptor;
     private boolean usingEncryption; 
@@ -64,9 +62,8 @@
     private DQPWorkContext workContext = new DQPWorkContext();
     private SessionServiceInterface sessionService;
         
-    public SocketClientInstance(ObjectChannel objectSocket, WorkerPool workerPool, ClientServiceRegistry server, boolean isClientEncryptionEnabled, SessionServiceInterface sessionService) {
+    public SocketClientInstance(ObjectChannel objectSocket, ClientServiceRegistry server, boolean isClientEncryptionEnabled, SessionServiceInterface sessionService) {
         this.objectSocket = objectSocket;
-        this.workerPool = workerPool;
         this.server = server;
         this.usingEncryption = isClientEncryptionEnabled;
         this.sessionService = sessionService;
@@ -146,7 +143,7 @@
 		if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT, MessageLevel.DETAIL)) { 
 			LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "processing message:" + packet); //$NON-NLS-1$
         }
-		workerPool.execute(new ServerWorkItem(this, packet.getMessageKey(), packet, this.server, this.sessionService));
+		new ServerWorkItem(this, packet.getMessageKey(), packet, this.server, this.sessionService).run();
 	}
 
 	public void shutdown() throws CommunicationException {

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -23,9 +23,8 @@
 package com.metamatrix.common.comm.platform.socket.server;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.net.ssl.SSLEngine;
 
@@ -43,6 +42,7 @@
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.queue.WorkerPool;
 import com.metamatrix.common.queue.WorkerPoolFactory;
+import com.metamatrix.common.queue.WorkerPoolStats;
 import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.MetaMatrixProductVersion;
 import com.metamatrix.platform.security.api.service.SessionServiceInterface;
@@ -53,11 +53,12 @@
  */
 public class SocketListener implements ChannelListenerFactory {
     private ClientServiceRegistry server;
-    private WorkerPool workerPool;
     private SSLAwareChannelHandler channelHandler;
     private Channel serverChanel;
     private boolean isClientEncryptionEnabled;
     private SessionServiceInterface sessionService;
+    private WorkerPool workerPool;
+    private ExecutorService bossService;
     
     /**
      * 
@@ -71,7 +72,7 @@
      */
     public SocketListener(int port, String bindAddress,
 			ClientServiceRegistry server, int inputBufferSize,
-			int outputBufferSize, WorkerPool workerPool, SSLEngine engine, boolean isClientEncryptionEnabled, SessionServiceInterface sessionService) {
+			int outputBufferSize, int maxWorkers, SSLEngine engine, boolean isClientEncryptionEnabled, SessionServiceInterface sessionService) {
     	this.isClientEncryptionEnabled = isClientEncryptionEnabled;
     	this.sessionService = sessionService;
     	if (port < 0 || port > 0xFFFF) {
@@ -79,18 +80,14 @@
         }
 
        	this.server = server;
-        this.workerPool = workerPool;
+        this.workerPool = WorkerPoolFactory.newWorkerPool("SocketWorker", maxWorkers, 120000); //$NON-NLS-1$
+        this.bossService = Executors.newCachedThreadPool();
         if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT, MessageLevel.DETAIL)) { 
             LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "server = " + this.server + "binding to port:" + port); //$NON-NLS-1$ //$NON-NLS-2$
 		}
 		
-		ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
-				Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
-				new SynchronousQueue<Runnable>(),
-				new WorkerPoolFactory.DefaultThreadFactory("ServerNio")); //$NON-NLS-1$
+        ChannelFactory factory = new NioServerSocketChannelFactory(bossService, workerPool, workerPool.getPoolSize());
         
-        ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor);
-        
         ServerBootstrap bootstrap = new ServerBootstrap(factory);
         this.channelHandler = new SSLAwareChannelHandler(this, engine, Thread.currentThread().getContextClassLoader());
         bootstrap.setPipelineFactory(channelHandler);
@@ -105,6 +102,10 @@
         this.serverChanel = bootstrap.bind(new InetSocketAddress(bindAddress, port));
     }
     
+    public WorkerPoolStats getProcessPoolStats() {
+    	return this.workerPool.getStats();
+    }
+    
     public int getPort() {
     	return ((InetSocketAddress)this.serverChanel.getLocalAddress()).getPort();
     }
@@ -115,6 +116,8 @@
     
     public void stop() {
     	this.serverChanel.close();
+    	this.workerPool.shutdownNow();
+    	this.bossService.shutdownNow();
     }
    
     public SocketListenerStats getStats() {
@@ -127,7 +130,7 @@
     }
 
 	public ChannelListener createChannelListener(ObjectChannel channel) {
-		return new SocketClientInstance(channel, this.workerPool, this.server, this.isClientEncryptionEnabled, this.sessionService);
+		return new SocketClientInstance(channel, this.server, this.isClientEncryptionEnabled, this.sessionService);
 	}
 
 }
\ No newline at end of file

Modified: trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties
===================================================================
--- trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties	2009-03-30 20:55:34 UTC (rev 664)
@@ -1273,7 +1273,8 @@
 LDAPMembershipDomain.Admin_credentials=Admin DN and/or password supplied for LDAP Membership Domain {0} are invalid.
 LDAPMembershipDomain.Require_memberof_property=Users in LDAP Membership Domain {0} will not appear as members of any group since user's memberOf and group's memberOf attributes are both unspecified.
 
-ServerWorkItem.Received_exception_processing_request=Received exception processing request
+ServerWorkItem.Received_exception_processing_request=Unexpected exception for session {0}
+ServerWorkItem.processing_error=Processing exception ''{0}'' for session {1}.  Exception type {2} throw from {3}. Enable detail logging to see the entire stacktrace.
 ServerWorkItem.Component_Not_Found=Component not found: {0}
 
 SocketVMController.0=Register Admin API

Modified: trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
===================================================================
--- trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java	2009-03-30 20:04:22 UTC (rev 663)
+++ trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java	2009-03-30 20:55:34 UTC (rev 664)
@@ -46,7 +46,6 @@
 import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
 import com.metamatrix.common.comm.platform.socket.client.SocketServerConnectionFactory;
 import com.metamatrix.common.comm.platform.socket.client.UrlServerDiscovery;
-import com.metamatrix.common.queue.WorkerPoolFactory;
 import com.metamatrix.common.util.crypto.NullCryptor;
 import com.metamatrix.platform.security.api.ILogon;
 import com.metamatrix.platform.security.api.LogonResult;
@@ -74,8 +73,7 @@
 		SessionServiceInterface sessionService = mock(SessionServiceInterface.class);
 		csr.registerClientService(ILogon.class, new LogonImpl(sessionService, "fakeCluster"), "foo"); //$NON-NLS-1$ //$NON-NLS-2$
 		listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
-				csr, 1024, 1024, WorkerPoolFactory.newWorkerPool(
-						"testIO", 1, 120000), null, true, sessionService); //$NON-NLS-1$
+				csr, 1024, 1024, 1, null, true, sessionService);
 
 		try {
 			Properties p = new Properties();
@@ -152,8 +150,7 @@
 				}
 			}, "foo"); //$NON-NLS-1$
 			listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
-					csr, 1024, 1024, WorkerPoolFactory.newWorkerPool(
-							"testIO", 1, 120000), serverSSL, isClientEncryptionEnabled, sessionService); //$NON-NLS-1$
+					csr, 1024, 1024, 1, serverSSL, isClientEncryptionEnabled, sessionService);
 			SocketListenerStats stats = listener.getStats();
 			assertEquals(0, stats.maxSockets);
 			assertEquals(0, stats.objectsRead);




More information about the teiid-commits mailing list