Author: shawkins
Date: 2009-03-30 16:55:34 -0400 (Mon, 30 Mar 2009)
New Revision: 664
Modified:
trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties
trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java
trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
Log:
TEIID-445, TEIID-386 cleanup of workerpool and logging of expected user exceptions.
Modified: trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java 2009-03-30
20:04:22 UTC (rev 663)
+++ trunk/common-core/src/main/java/com/metamatrix/core/BundleUtil.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.MissingResourceException;
import java.util.ResourceBundle;
+
import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.core.util.StringUtil;
@@ -186,12 +187,8 @@
*/
public String getString(final String key,
final Object... parameters) {
- String text = getProductValue(key);
+ String text = getString(key);
- if (text == null) {
- text = getString(key);
- }
-
// Check the trivial cases ...
if (text == null) {
return '<' + key + '>';
@@ -223,5 +220,4 @@
return value;
}
-
}
Modified: trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java
===================================================================
---
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPool.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public interface WorkerPool extends Executor {
@@ -32,7 +33,7 @@
List<Runnable> shutdownNow();
- void awaitTermination(long timeout, TimeUnit unit)
+ boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
boolean isTerminated();
@@ -40,5 +41,16 @@
WorkerPoolStats getStats();
boolean hasWork();
+
+ int getPoolSize();
+
+ ScheduledFuture<?> schedule(Runnable command,
+ long delay,
+ TimeUnit unit);
+
+ ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit);
}
\ No newline at end of file
Modified:
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java
===================================================================
---
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/common-internal/src/main/java/com/metamatrix/common/queue/WorkerPoolFactory.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -22,12 +22,19 @@
package com.metamatrix.common.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,208 +46,320 @@
import com.metamatrix.core.util.NamedThreadFactory;
/**
- * Creates WorkPools based upon {@link ThreadPoolExecutor}
+ * Creates named, queued, daemon Thread pools.
+ * <br/>
+ * The requirements are:
+ * <ol>
+ * <li>minimize thread creation</li>
+ * <li>allow for proper timeout of idle threads</li>
+ * <li>allow for queuing</li>
+ * </ol>
+ * <br/>
+ * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies
1 and 2, but not 3.
+ * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will
tend to create
+ * up to the maximum number of threads and makes no guarantee on thread scheduling.
+ * <br/>
+ * So the approach here is to use virtual thread pools off of single shared {@link
SynchronousQueue}
+ * backed {@link ThreadPoolExecutor}.
+ * <br/>
+ * There is also only a single master scheduling thread with actual executions deferred
to the calling
+ * WorkerPool.
+ *
+ * TODO: this probably needs to be re-thought, especially since the lifo ordering of a
{@link SynchronousQueue}
+ * is not guaranteed behavior. also there's a race condition between previously
retiring threads and new work -
+ * prior to being returned to the shared pool we can create extra threads if the shared
pool is exhausted.
+ * TODO: bounded queuing - we never bothered bounding in the past with our worker pools,
but reasonable
+ * defaults would be a good idea.
*/
public class WorkerPoolFactory {
+ private static ThreadPoolExecutor tpe = new ThreadPoolExecutor(0,
+ Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) {
//$NON-NLS-1$
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (t != null) {
+ LogManager.logError(LogCommonConstants.CTX_POOLING, t,
CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+ }
+ }
+ };
+
+ private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+
/**
- * Attempts to detect when new threads should be created without blocking.
- *
- * IMPORTANT NOTE: actual execution ordering is not guaranteed.
+ * TODO: purge user canceled scheduled tasks.
*/
- static final class ThreadReuseLinkedBlockingQueue extends
- LinkedBlockingQueue<Runnable> {
- private StatsCapturingThreadPoolExecutor executor;
-
- void setExecutor(StatsCapturingThreadPoolExecutor executor) {
- this.executor = executor;
- }
+ static class StatsCapturingSharedThreadPoolExecutor implements WorkerPool {
- @Override
- public boolean offer(Runnable o) {
- if (executor.getPoolSize() + executor.getCompletedCount() >=
executor.getSubmittedCount()) {
- /*
- * TODO: this strategy suffers from the same possible flaws as the previous
implementation of
- * WorkerPool. If the available threads are in the process of dying, we run the risk
of
- * queuing without starting another thread (fortunately the ThreadPoolExecutor itself
will
- * try to mitigate this case as well).
- *
- * Since this was never observed to be a problem before, we'll not initially
worry about it with
- * this implementation.
- */
- return super.offer(o);
+ class ScheduledFutureTask extends FutureTask<Void> implements
ScheduledFuture<Void> {
+ private ScheduledFuture<?> scheduledFuture;
+ private boolean periodic;
+ private volatile boolean running;
+
+ public ScheduledFutureTask(Runnable runnable, boolean periodic) {
+ super(runnable, null);
+ this.periodic = periodic;
}
- return false; //trigger thread creation if possible
- }
+
+ public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
+ scheduledTasks.add(this);
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return this.scheduledFuture.getDelay(unit);
+ }
- @Override
- public boolean add(Runnable arg0) {
- if (super.offer(arg0)) {
- return true;
+ @Override
+ public int compareTo(Delayed o) {
+ return this.scheduledFuture.compareTo(o);
}
- throw new IllegalStateException("Queue full"); //$NON-NLS-1$
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ this.scheduledFuture.cancel(false);
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ public Runnable getParent() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (running || terminated) {
+ return;
+ }
+ running = periodic;
+ execute(ScheduledFutureTask.this);
+ }
+ };
+ }
+
+ @Override
+ public void run() {
+ if (periodic) {
+ if (!this.runAndReset()) {
+ this.scheduledFuture.cancel(false);
+ scheduledTasks.remove(this);
+ }
+ running = false;
+ } else {
+ scheduledTasks.remove(this);
+ super.run();
+ }
+ }
}
- }
-
- static class StatsCapturingThreadPoolExecutor extends ThreadPoolExecutor {
- private AtomicInteger activeCount = new AtomicInteger(0);
- private AtomicInteger submittedCount = new AtomicInteger(0);
- private AtomicInteger completedCount = new AtomicInteger(0);
+ private volatile int activeCount;
+ private volatile int maxActiveCount;
+ private volatile int maxQueueSize;
+ private volatile boolean terminated;
+ private volatile int submittedCount;
+ private volatile int completedCount;
+ private Object poolLock = new Object();
+ private AtomicInteger threadCounter = new AtomicInteger();
+ private Set<Thread> threads =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread,
Boolean>()));
+ private Set<ScheduledFutureTask> scheduledTasks =
Collections.synchronizedSet(Collections.newSetFromMap(new
IdentityHashMap<ScheduledFutureTask, Boolean>()));
- public StatsCapturingThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- }
+ private String poolName;
+ private int maximumPoolSize;
+ private Queue<Runnable> queue = new LinkedList<Runnable>();
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- activeCount.getAndIncrement();
+ public StatsCapturingSharedThreadPoolExecutor(String name, int maximumPoolSize) {
+ this.maximumPoolSize = maximumPoolSize;
+ this.poolName = name;
}
@Override
- protected void afterExecute(Runnable r, Throwable t) {
- if (t != null) {
- LogManager.logError(LogCommonConstants.CTX_POOLING, t,
CommonPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+ public void execute(final Runnable command) {
+ boolean atMaxThreads = false;
+ boolean newMaxQueueSize = false;
+ synchronized (poolLock) {
+ checkForTermination();
+ submittedCount++;
+ atMaxThreads = activeCount == maximumPoolSize;
+ if (atMaxThreads) {
+ queue.add(command);
+ int queueSize = queue.size();
+ if (queueSize > maxQueueSize) {
+ atMaxThreads = true;
+ maxQueueSize = queueSize;
+ }
+ } else {
+ activeCount++;
+ maxActiveCount = Math.max(activeCount, maxActiveCount);
+ }
}
- activeCount.getAndDecrement();
- completedCount.getAndIncrement();
+ if (atMaxThreads) {
+ if (newMaxQueueSize && maximumPoolSize > 1) {
+ LogManager.logWarning(LogCommonConstants.CTX_POOLING,
CommonPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName,
maxQueueSize)); //$NON-NLS-1$
+ }
+ return;
+ }
+ tpe.execute(new Runnable() {
+ @Override
+ public void run() {
+ Thread t = Thread.currentThread();
+ threads.add(t);
+ String name = t.getName();
+ t.setName(name + "_" + poolName + threadCounter.getAndIncrement());
//$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_POOLING,
MessageLevel.TRACE)) {
+ LogManager.logTrace(LogCommonConstants.CTX_POOLING, "Beginning work with
virtual worker", t.getName()); //$NON-NLS-1$
+ }
+ Runnable r = command;
+ while (r != null) {
+ boolean success = false;
+ try {
+ r.run();
+ success = true;
+ } finally {
+ synchronized (poolLock) {
+ if (success) {
+ completedCount++;
+ r = queue.poll();
+ }
+ if (r != null) {
+ continue;
+ }
+ threads.remove(t);
+ activeCount--;
+ if (activeCount == 0 && terminated) {
+ poolLock.notifyAll();
+ }
+ }
+ t.setName(name);
+ }
+ }
+ };
+ });
}
-
- @Override
- public void execute(Runnable command) {
- submittedCount.getAndIncrement();
- super.execute(command);
+
+ private void checkForTermination() {
+ if (terminated) {
+ throw new RejectedExecutionException();
+ }
}
- @Override
public int getActiveCount() {
- return activeCount.get();
+ return activeCount;
}
public int getSubmittedCount() {
- return submittedCount.get();
+ return submittedCount;
}
public int getCompletedCount() {
- return completedCount.get();
+ return completedCount;
}
- }
-
- public static class DefaultThreadFactory extends NamedThreadFactory {
+ public int getPoolSize() {
+ return maximumPoolSize;
+ }
- public DefaultThreadFactory(String name) {
- super(name);
+ public boolean isTerminated() {
+ return terminated;
}
-
- public Thread newThread(Runnable r) {
- Thread result = super.newThread(r);
- if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_POOLING,
MessageLevel.TRACE)) {
- LogManager.logTrace(LogCommonConstants.CTX_POOLING,
CommonPlugin.Util.getString("WorkerPool.New_thread", result.getName()));
//$NON-NLS-1$
+
+ public void shutdown() {
+ this.terminated = true;
+ synchronized (scheduledTasks) {
+ for (ScheduledFuture<?> future : scheduledTasks) {
+ future.cancel(false);
+ }
+ scheduledTasks.clear();
}
- return result;
}
- }
-
- static class ExecutorWorkerPool implements WorkerPool {
- private String name;
- private StatsCapturingThreadPoolExecutor executor;
-
- public ExecutorWorkerPool(final String name, StatsCapturingThreadPoolExecutor executor)
{
- this.name = name;
- this.executor = executor;
+ public int getLargestPoolSize() {
+ return this.maxActiveCount;
}
- public void execute(Runnable r) {
- this.executor.execute(r);
- }
-
- public void awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- this.executor.awaitTermination(timeout, unit);
- }
-
+ @Override
public WorkerPoolStats getStats() {
WorkerPoolStats stats = new WorkerPoolStats();
- stats.name = name;
- stats.queued = executor.getQueue().size();
- stats.threads = executor.getPoolSize();
- stats.activeThreads = executor.getActiveCount();
- stats.totalSubmitted = executor.getSubmittedCount();
- //TODO: highestActiveThreads is misleading for pools that prefer to use new threads
- stats.highestActiveThreads = executor.getLargestPoolSize();
- stats.totalCompleted = executor.getCompletedCount();
+ stats.name = poolName;
+ stats.queued = queue.size();
+ stats.threads = getPoolSize();
+ stats.activeThreads = getActiveCount();
+ stats.totalSubmitted = getSubmittedCount();
+ stats.highestActiveThreads = getLargestPoolSize();
+ stats.totalCompleted = getCompletedCount();
return stats;
}
-
- public boolean isTerminated() {
- return this.executor.isTerminated();
+
+ @Override
+ public boolean hasWork() {
+ synchronized (poolLock) {
+ return this.getSubmittedCount() - this.getCompletedCount() > 0 &&
!this.isTerminated();
+ }
}
- public void shutdown() {
- this.executor.shutdown();
+ @Override
+ public List<Runnable> shutdownNow() {
+ this.shutdown();
+ synchronized (poolLock) {
+ synchronized (threads) {
+ for (Thread t : threads) {
+ t.interrupt();
+ }
+ }
+ List<Runnable> result = new ArrayList<Runnable>(queue);
+ queue.clear();
+ return result;
+ }
}
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long timeoutMillis = unit.toMillis(timeout);
+ long finalMillis = System.currentTimeMillis() + timeoutMillis;
+ synchronized (poolLock) {
+ while (this.activeCount > 0 || !terminated) {
+ if (timeoutMillis < 1) {
+ return false;
+ }
+ poolLock.wait(timeoutMillis);
+ timeoutMillis = finalMillis - System.currentTimeMillis();
+ }
+ }
+ return true;
+ }
- public boolean hasWork() {
- return this.executor.getSubmittedCount() - this.executor.getCompletedCount() > 0
&& !this.executor.isTerminated();
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, long delay,
+ TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(command, false);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
}
@Override
- public List<Runnable> shutdownNow() {
- return this.executor.shutdownNow();
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+ long initialDelay, long period, TimeUnit unit) {
+ checkForTermination();
+ ScheduledFutureTask sft = new ScheduledFutureTask(command, true);
+ synchronized (scheduledTasks) {
+ ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(),
initialDelay, period, unit);
+ sft.setScheduledFuture(future);
+ return sft;
+ }
}
}
-
+
/**
* Creates a WorkerPool that prefers thread reuse over thread creation based upon the
given parameters
*
* @param name
* @param numThreads the maximum number of worker threads allowed
- * @param keepAlive keepAlive time in milliseconds - NOT supported until JDK 1.6 for
pools that don't prefer existing threads
* @return
*/
public static WorkerPool newWorkerPool(String name, int numThreads, long keepAlive) {
- return newWorkerPool(name, numThreads, keepAlive, true);
+ return new StatsCapturingSharedThreadPoolExecutor(name, numThreads);
}
-
- public static WorkerPool newWorkerPool(String name, final int numThreads, long
keepAlive, boolean preferExistingThreads) {
- if (preferExistingThreads && numThreads > 1) {
- final ThreadReuseLinkedBlockingQueue queue = new ThreadReuseLinkedBlockingQueue();
- StatsCapturingThreadPoolExecutor executor =
- new StatsCapturingThreadPoolExecutor(0, numThreads, keepAlive, TimeUnit.MILLISECONDS,
queue, new DefaultThreadFactory(name)) {
-
- @Override
- public void execute(Runnable arg0) {
- if (this.isShutdown()) {
- //bypass the rejection handler
- throw new RejectedExecutionException();
- }
- super.execute(arg0);
- }
-
- };
- queue.setExecutor(executor);
- executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable arg0,
- ThreadPoolExecutor arg1) {
- try {
- queue.add(arg0);
- } catch (IllegalStateException e) {
- throw new RejectedExecutionException(e);
- }
- }
- });
- return new ExecutorWorkerPool(name, executor);
- }
- StatsCapturingThreadPoolExecutor executor = new
StatsCapturingThreadPoolExecutor(numThreads, numThreads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory(name));
- executor.allowCoreThreadTimeOut(true);
- return new ExecutorWorkerPool(name, executor);
- }
-
+
}
Modified: trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties
===================================================================
---
trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/common-internal/src/main/resources/com/metamatrix/common/i18n.properties 2009-03-30
20:55:34 UTC (rev 664)
@@ -3100,4 +3100,5 @@
VDBDefnXMLHelper.Unable_to_read_defn_file=Unable to read DEF file.
WorkerPool.uncaughtException=Uncaught exception in worker pool
-WorkerPool.New_thread=New work thread created with name {0}
\ No newline at end of file
+WorkerPool.New_thread=New work thread created with name {0}
+WorkerPool.Max_thread=Reached maximum thread count "{0}" for worker pool
"{1}" with a queue size of "{2}".
\ No newline at end of file
Modified:
trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java
===================================================================
---
trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/common-internal/src/test/java/com/metamatrix/common/queue/TestQueueWorkerPool.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -22,24 +22,21 @@
package com.metamatrix.common.queue;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import junit.framework.*;
+import org.junit.Test;
/**
*/
-public class TestQueueWorkerPool extends TestCase {
+public class TestQueueWorkerPool {
- /**
- * Constructor for TestQueueWorkerPool.
- * @param arg0
- */
- public TestQueueWorkerPool(String arg0) {
- super(arg0);
- }
-
- public void testQueuing() throws Exception {
+ @Test public void testQueuing() throws Exception {
final long SINGLE_WAIT = 50;
final int WORK_ITEMS = 10;
final int MAX_THREADS = 5;
@@ -58,7 +55,7 @@
assertEquals("Expected threads to be maxed out", MAX_THREADS,
stats.highestActiveThreads); //$NON-NLS-1$
}
- public void testThreadReuse() throws Exception {
+ @Test public void testThreadReuse() throws Exception {
final long SINGLE_WAIT = 50;
final long NUM_THREADS = 5;
@@ -81,15 +78,78 @@
pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
- public void testShutdown() throws Exception {
+ @Test(expected=RejectedExecutionException.class) public void testShutdown() throws
Exception {
final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5,
120000); //$NON-NLS-1$
pool.shutdown();
- try {
- pool.execute(new FakeWorkItem(1));
- fail("expected exception"); //$NON-NLS-1$
- } catch (RejectedExecutionException e) {
-
- }
+ pool.execute(new FakeWorkItem(1));
}
+
+ @Test public void testScheduleCancel() throws Exception {
+ final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5,
120000); //$NON-NLS-1$
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 0, 5, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ assertFalse(future.cancel(true));
+ }
+
+ @Test public void testSchedule() throws Exception {
+ final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5,
120000); //$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ ScheduledFuture<?> future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ }
+ }, 5, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ Thread.sleep(10);
+ assertEquals(0, result.size());
+ future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ }
+ }, 5, TimeUnit.MILLISECONDS);
+ Thread.sleep(10);
+ pool.shutdown();
+ pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertEquals(1, result.size());
+ }
+
+ @Test(expected=ExecutionException.class) public void testScheduleException() throws
Exception {
+ final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5,
120000); //$NON-NLS-1$
+ ScheduledFuture<?> future = pool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ future.get();
+ }
+
+ /**
+ * Here each execution exceeds the period, so only half the number of executions are
expected.
+ */
+ @Test public void testScheduleRepeated() throws Exception {
+ final WorkerPool pool = WorkerPoolFactory.newWorkerPool("test", 5,
120000); //$NON-NLS-1$
+ final ArrayList<String> result = new ArrayList<String>();
+ ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ result.add("hello"); //$NON-NLS-1$
+ try {
+ Thread.sleep(7);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 0, 5, TimeUnit.MILLISECONDS);
+ Thread.sleep(99);
+ future.cancel(true);
+ assertEquals(10, result.size());
+ }
}
Modified:
trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java
===================================================================
---
trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/connector-api/src/main/java/org/teiid/connector/api/ConnectorEnvironment.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -24,6 +24,9 @@
import java.util.Properties;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.teiid.connector.language.ILanguageFactory;
@@ -67,4 +70,20 @@
* conversions supplied by the Connector API.
*/
TypeFacility getTypeFacility();
+
+ /**
+ * Schedule a command for repeated execution with the same contract as
+ * {@link ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable, long, long,
TimeUnit)}
+ * Executions will not happen concurrently. If an execution takes longer than a
period,
+ * the next execution will take place on the first period interval after completion.
+ * @param command
+ * @param initialDelay
+ * @param period
+ * @param unit
+ * @return
+ */
+ ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit);
}
Modified:
trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java
===================================================================
---
trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/connectors/connector-salesforce/src/test/java/com/metamatrix/connector/salesforce/TestConnector.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -21,13 +21,12 @@
*/
package com.metamatrix.connector.salesforce;
+import junit.framework.TestCase;
+
import org.teiid.connector.api.ConnectorEnvironment;
import org.teiid.connector.api.ConnectorException;
import org.teiid.connector.api.ExecutionContext;
-import junit.framework.TestCase;
-
-import com.metamatrix.connector.salesforce.connection.SalesforceConnection;
import com.metamatrix.connector.salesforce.test.util.ObjectFactory;
public class TestConnector extends TestCase {
@@ -53,12 +52,6 @@
noCredConnector.start(env2);
}
- public void testGetConnection() throws Exception {
- ExecutionContext secContext = ObjectFactory.getDefaultSecurityContext();
- SalesforceConnection connection = (SalesforceConnection)
connector.getConnection(secContext);
- assertNotNull("the connection is null", connection);
- }
-
/*
public void testGetConnectionTrustedToken() {
ExecutionContext secContext = TestObjectFactory.getTokenExecutionContext();
@@ -136,7 +129,6 @@
public void testGetState() {
assertNotNull(connector.getState());
- assertTrue(connector.getState() instanceof ConnectorState);
}
public void testStopNoInit() {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorEnvironmentImpl.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -27,6 +27,8 @@
package org.teiid.dqp.internal.datamgr.impl;
import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.teiid.connector.api.ConnectorEnvironment;
import org.teiid.connector.api.ConnectorLogger;
@@ -43,7 +45,28 @@
*/
public class ConnectorEnvironmentImpl implements ConnectorEnvironment {
- private static final TypeFacility TYPE_FACILITY = new TypeFacilityImpl();
+ private final class ContextClassLoaderPreservingRunnable implements
+ Runnable {
+ private final Runnable arg0;
+ private final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+ private ContextClassLoaderPreservingRunnable(Runnable arg0) {
+ this.arg0 = arg0;
+ }
+
+ @Override
+ public void run() {
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(cl);
+ try {
+ arg0.run();
+ } finally {
+ Thread.currentThread().setContextClassLoader(current);
+ }
+ }
+ }
+
+ private static final TypeFacility TYPE_FACILITY = new TypeFacilityImpl();
private ConnectorLogger logger;
private Properties properties;
@@ -117,12 +140,20 @@
}
@Override
- public void execute(Runnable arg0) {
+ public void execute(Runnable command) {
if (this.workerPool != null) {
- this.workerPool.execute(arg0);
+ this.workerPool.execute(new ContextClassLoaderPreservingRunnable(command));
} else {
- arg0.run();
+ command.run();
}
-
}
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay, long period, TimeUnit unit) {
+ if (this.workerPool != null) {
+ return this.workerPool.scheduleAtFixedRate(new
ContextClassLoaderPreservingRunnable(command), initialDelay, period, unit);
+ }
+ return null;
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -35,9 +35,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
@@ -123,9 +122,6 @@
// known requests
private ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem> requestStates =
new ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem>();
- // Lazily created and used for asynch query execution
- private Timer timer;
-
private Properties props;
private ClassLoader classloader;
@@ -239,7 +235,7 @@
}
workItem.requestClose();
}
-
+
/**
* Schedule a task to be executed after the specified delay (in milliseconds)
* @param task The task to execute
@@ -247,17 +243,12 @@
* @since 4.3.3
*/
public void scheduleTask(final AsynchConnectorWorkItem state, long delay) {
- synchronized(this) {
- if(this.timer == null) {
- this.timer = new Timer("AsynchRequestThread", true);
//$NON-NLS-1$
- }
- }
-
- this.timer.schedule(new TimerTask() {
- @Override
- public void run() {
- state.requestMore();
- }}, delay);
+ this.connectorWorkerPool.schedule(new Runnable() {
+ @Override
+ public void run() {
+ state.requestMore();
+ }
+ }, delay, TimeUnit.MILLISECONDS);
}
/**
@@ -294,7 +285,6 @@
throw new
ApplicationLifecycleException("ConnectorManager.cannot_restart"); //$NON-NLS-1$
}
connectorName = props.getProperty(ConnectorPropertyNames.CONNECTOR_BINDING_NAME,
"Unknown_Binding_Name"); //$NON-NLS-1$
-
String connIDStr = props.getProperty(ConnectorPropertyNames.CONNECTOR_ID);
connectorID = new ConnectorID(connIDStr);
@@ -527,11 +517,6 @@
}
- if(this.timer != null) {
- this.timer.cancel();
- this.timer = null;
- }
-
if (this.rsCache != null) {
this.rsCache.shutDown();
this.rsCache = null;
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/ConnectionPool.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -32,13 +32,12 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.teiid.connector.DataPlugin;
import org.teiid.connector.api.Connection;
+import org.teiid.connector.api.ConnectorEnvironment;
import org.teiid.connector.api.ConnectorException;
import org.teiid.connector.api.ConnectorIdentity;
import org.teiid.connector.api.ExecutionContext;
@@ -122,8 +121,6 @@
private Map<ConnectorIdentity, ConnectionsForId> idConnections = new
HashMap<ConnectorIdentity, ConnectionsForId>();
private Map<ConnectionWrapper, ConnectorIdentity> reverseIdConnections = new
IdentityHashMap<ConnectionWrapper, ConnectorIdentity>();
- private Timer cleaningThread;
-
private Object lock = new Object();
private Semaphore poolSemaphore;
@@ -148,8 +145,9 @@
* @param poolProperties Properties of the pool as defined in this class. May be
empty, but may not be null.
* @throws ConnectionPoolException if the connection pool fails to initialize.
*/
- public void initialize(Properties poolProperties) throws ConnectionPoolException {
- ArgCheck.isNotNull(poolProperties);
+ public void initialize(ConnectorEnvironment env) throws ConnectionPoolException {
+ ArgCheck.isNotNull(env);
+ Properties poolProperties = env.getProperties();
maxConnections = PropertiesUtils.getIntProperty(poolProperties, MAX_CONNECTIONS,
DEFAULT_MAX_CONNECTION);
if (maxConnections < 1) {
@@ -170,13 +168,11 @@
testConnectInterval = PropertiesUtils.getIntProperty(poolProperties,
SOURCE_CONNECTION_TEST_INTERVAL, DEFAULT_SOURCE_CONNECTION_TEST_INTERVAL);
if (enableShrinking && !this.shuttingDownPool) {
- this.cleaningThread = new Timer("ConnectionPoolCleaningThread", true);
//$NON-NLS-1$
- cleaningThread.schedule(new TimerTask() {
- @Override
- public void run() {
- ConnectionPool.this.cleanUp(false);
- }
- }, cleaningInterval, cleaningInterval);
+ env.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ cleanUp(false);
+ };
+ }, cleaningInterval, cleaningInterval, TimeUnit.MILLISECONDS);
}
LogManager.logInfo(CTX_CONNECTOR,
DataPlugin.Util.getString("ConnectionPool.Connection_pool_created_1"));
//$NON-NLS-1$
@@ -216,7 +212,7 @@
ConnectionsForId connLists = null;
synchronized (this.lock) {
- connLists = (ConnectionsForId) this.idConnections.get(id);
+ connLists = this.idConnections.get(id);
if ( connLists == null ) {
connLists = new ConnectionsForId();
if (this.maxConnectionsForEachID < this.maxConnections) {
@@ -325,7 +321,7 @@
if (connsForId.unused.isEmpty()) {
continue;
}
- ConnectionWrapper conn = (ConnectionWrapper)
connsForId.unused.removeFirst();
+ ConnectionWrapper conn = connsForId.unused.removeFirst();
closeSourceConnection(conn, id);
break;
}
@@ -354,8 +350,8 @@
ConnectionsForId connLists = null;
ConnectorIdentity id = null;
synchronized (this.lock) {
- id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
- connLists = (ConnectionsForId) this.idConnections.get(id);
+ id = this.reverseIdConnections.get(connection);
+ connLists = this.idConnections.get(id);
}
if (connLists == null) {
@@ -398,11 +394,6 @@
shuttingDownPool = true;
- //close cleaning thread
- if (this.cleaningThread != null) {
- this.cleaningThread.cancel();
- }
-
this.cleanUp(true);
}
@@ -459,8 +450,8 @@
ConnectorIdentity id = null;
ConnectionsForId connLists = null;
synchronized (this.lock) {
- id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
- connLists = (ConnectionsForId) this.idConnections.get(id);
+ id = this.reverseIdConnections.get(connection);
+ connLists = this.idConnections.get(id);
}
if ( connLists != null ) {
synchronized (connLists) {
@@ -475,8 +466,8 @@
ConnectorIdentity id = null;
ConnectionsForId connLists = null;
synchronized (this.lock) {
- id = (ConnectorIdentity) this.reverseIdConnections.get(connection);
- connLists = (ConnectionsForId) this.idConnections.get(id);
+ id = this.reverseIdConnections.get(connection);
+ connLists = this.idConnections.get(id);
}
if ( connLists != null ) {
synchronized (connLists) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/pooling/connector/PooledConnector.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -25,7 +25,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
@@ -101,10 +100,9 @@
@Override
public void start(ConnectorEnvironment environment)
throws ConnectorException {
- Properties p = environment.getProperties();
- pool.initialize(p);
+ pool.initialize(environment);
if (xaPool != null) {
- xaPool.initialize(p);
+ xaPool.initialize(environment);
}
super.start(environment);
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -196,9 +196,7 @@
protected void process() {
DQPWorkContext.setWorkContext(this.dqpWorkContext);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL))
{
- LogManager.logDetail(LogConstants.CTX_DQP, "############# PW PROCESSING
on " + requestID + " with state "+ state +" ###########");
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
+ LogManager.logDetail(LogConstants.CTX_DQP, "############# PW PROCESSING
on", requestID, "with state", state, "###########");
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
try {
if (this.state == ProcessingState.NEW) {
@@ -217,32 +215,30 @@
}
} catch (BlockedOnMemoryException e) {
moreWork(false);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on
" + requestID + " - reenqueueing for more processing due to lack of available
memory ###########"); //$NON-NLS-1$ //$NON-NLS-2$
- }
+ LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING
on", requestID, "- reenqueueing for more processing due to lack of available
memory ###########"); //$NON-NLS-1$ //$NON-NLS-2$
} catch (BlockedException e) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING
on " + requestID + " - processor blocked ###########"); //$NON-NLS-1$
//$NON-NLS-2$
- }
+ LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING
on", requestID, "- processor blocked ###########"); //$NON-NLS-1$
//$NON-NLS-2$
} catch (Throwable e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e, "############# PW EXITING
on", requestID, "- error occurred ###########"); //$NON-NLS-1$
//$NON-NLS-2$
//if there is a cache, remove temp results if there is any
if(this.rsCache != null){
rsCache.removeTempResults(cid);
}
- if (isCanceled()) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "############# PW
EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$
//$NON-NLS-2$
- }
- } else {
+ if (!isCanceled()) {
logCommandError();
//Case 5558: Differentiate between system level errors and
//processing errors. Only log system level errors as errors,
//log the processing errors as warnings only
if(e instanceof MetaMatrixProcessingException) {
- LogManager.logWarning(LogConstants.CTX_DQP, e, "############# PW
EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$
//$NON-NLS-2$
+ Throwable cause = e;
+ while (cause.getCause() != null) {
+ cause = e.getCause();
+ }
+ StackTraceElement elem = cause.getStackTrace()[0];
+ LogManager.logWarning(LogConstants.CTX_DQP,
DQPPlugin.Util.getString("ProcessWorker.processing_error", e.getMessage(),
requestID, e.getClass().getName(), elem)); //$NON-NLS-1$
}else {
- LogManager.logError(LogConstants.CTX_DQP, e, "############# PW
EXITING on " + requestID + " - error occurred ###########"); //$NON-NLS-1$
//$NON-NLS-2$
+ LogManager.logError(LogConstants.CTX_DQP, e,
DQPPlugin.Util.getString("ProcessWorker.error", requestID)); //$NON-NLS-1$
}
}
@@ -536,10 +532,7 @@
}
private void sendError() {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, processingException,
DQPPlugin.Util.getString("ProcessWorker.send_error", requestID)); //$NON-NLS-1$
- }
-
+ LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Sedning error to
client", requestID); //$NON-NLS-1$
ResultsMessage response = new ResultsMessage(requestMsg);
response.setException(processingException);
setAnalysisRecords(response, analysisRecord);
Modified: trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties 2009-03-30 20:04:22
UTC (rev 663)
+++ trunk/engine/src/main/resources/com/metamatrix/dqp/i18n.properties 2009-03-30 20:55:34
UTC (rev 664)
@@ -151,6 +151,8 @@
ProcessWorker.Failed_to_deliver_response_for_{0}=Failed to deliver response for {0}.
Client connection may have been closed.
ProcessWorker.ProcessWorker.Failed_starting_processing._1=ProcessWorker.Failed_starting_processing.
ProcessWorker.failed_rollback=Failed to properly rollback autowrap transaction properly
+ProcessWorker.error=Unexpected exception for request {0}
+ProcessWorker.processing_error=Processing exception ''{0}'' for request
{1}. Exception type {2} throw from {3}. Enable detail logging to see the entire
stacktrace.
SharedCacheFinder.Didnt_find_caps=Unable to find capabilities for {0}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/pooling/connector/TestConnectionPool.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -22,31 +22,42 @@
package org.teiid.dqp.internal.pooling.connector;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.teiid.connector.api.ConnectorEnvironment;
+import org.teiid.connector.api.ConnectorLogger;
import org.teiid.connector.api.ExecutionContext;
import org.teiid.connector.api.MappedUserIdentity;
+import org.teiid.dqp.internal.datamgr.impl.ConnectorEnvironmentImpl;
import org.teiid.dqp.internal.datamgr.impl.ConnectorWrapper;
import org.teiid.dqp.internal.datamgr.impl.ExecutionContextImpl;
-import org.teiid.dqp.internal.pooling.connector.ConnectionPool;
-import org.teiid.dqp.internal.pooling.connector.ConnectionPoolException;
-import org.teiid.dqp.internal.pooling.connector.ConnectionWrapper;
-import junit.framework.TestCase;
+import com.metamatrix.common.application.ApplicationEnvironment;
+import com.metamatrix.common.queue.WorkerPool;
+import com.metamatrix.common.queue.WorkerPoolFactory;
-public class TestConnectionPool extends TestCase{
+public class TestConnectionPool {
private ConnectionPool userIDPool;
private ConnectionPool singleIDPool;
private static ArrayList<Exception> EXCEPTIONS = new
ArrayList<Exception>();
-
- public TestConnectionPool(String name) {
- super(name);
+ private static WorkerPool pool =
WorkerPoolFactory.newWorkerPool(TestConnectionPool.class.getSimpleName(), 1, 1000);
+
+ @AfterClass public static void tearDownOnce() {
+ pool.shutdownNow();
}
- public void setUp() throws Exception{
+ @Before public void setUp() throws Exception{
FakeSourceConnectionFactory.alive = true;
singleIDPool = new ConnectionPool(new ConnectorWrapper(new
FakeSourceConnectionFactory()));
@@ -58,12 +69,17 @@
poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1");
//$NON-NLS-1$
- poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
-
- singleIDPool.initialize(poolProperties);
- userIDPool.initialize(poolProperties);
+ ConnectorEnvironment env = createConnectorEnvironment(poolProperties);
+ singleIDPool.initialize(env);
+ userIDPool.initialize(env);
}
+
+ private ConnectorEnvironment createConnectorEnvironment(
+ Properties poolProperties) {
+ ConnectorEnvironment env = new ConnectorEnvironmentImpl(poolProperties,
Mockito.mock(ConnectorLogger.class), new ApplicationEnvironment(), pool);
+ return env;
+ }
public void tearDown() throws Exception{
singleIDPool.shutDown();
@@ -79,7 +95,7 @@
}
//=== tests ===//
- public void testPoolUsingSingleIdentity() throws Exception {
+ @Test public void testPoolUsingSingleIdentity() throws Exception {
ExecutionContext context = createContext("x", false);//$NON-NLS-1$
ConnectionWrapper conn1 = singleIDPool.obtain(context);
@@ -99,7 +115,7 @@
assertEquals(3, singleIDPool.getTotalConnectionCount());
}
- public void testMaxConnectionTest() throws Exception {
+ @Test public void testMaxConnectionTest() throws Exception {
ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new
FakeSourceConnectionFactory()));
Properties poolProperties = new Properties();
@@ -108,18 +124,17 @@
poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1");
//$NON-NLS-1$
- poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
try {
- pool.initialize(poolProperties);
+ pool.initialize(createConnectorEnvironment(poolProperties));
fail("should have failed to use 0 as max connections");
//$NON-NLS-1$
}catch (ConnectionPoolException e) {
// pass
}
}
- public void testMaxConnectionTest1() throws Exception {
+ @Test public void testMaxConnectionTest1() throws Exception {
ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new
FakeSourceConnectionFactory()));
Properties poolProperties = new Properties();
@@ -128,18 +143,17 @@
poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1");
//$NON-NLS-1$
- poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
try {
- pool.initialize(poolProperties);
+ pool.initialize(createConnectorEnvironment(poolProperties));
fail("should have failed to use 0 as max connections");
//$NON-NLS-1$
}catch (ConnectionPoolException e) {
// pass
}
}
- public void testMaxConnectionTest2() throws Exception {
+ @Test public void testMaxConnectionTest2() throws Exception {
ConnectionPool pool = new ConnectionPool(new ConnectorWrapper(new
FakeSourceConnectionFactory()));
Properties poolProperties = new Properties();
@@ -148,18 +162,17 @@
poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "500");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1");
//$NON-NLS-1$
- poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
try {
- pool.initialize(poolProperties);
+ pool.initialize(createConnectorEnvironment(poolProperties));
fail("should have failed to use 0 as max connections");
//$NON-NLS-1$
}catch (ConnectionPoolException e) {
// pass
}
}
- public void testMessageWhenPoolMaxedOutPerIdentity() throws Exception {
+ @Test public void testMessageWhenPoolMaxedOutPerIdentity() throws Exception {
ExecutionContext context = createContext("x", false);//$NON-NLS-1$
// Max out the pool - 5 connections for same ID
@@ -176,7 +189,7 @@
}
}
- public void testMessageWhenPoolTimedOut() throws Exception {
+ @Test public void testMessageWhenPoolTimedOut() throws Exception {
FakeSourceConnectionFactory.alive = true;
singleIDPool = new ConnectionPool(new ConnectorWrapper(new
FakeSourceConnectionFactory()));
@@ -186,9 +199,9 @@
poolProperties.put(ConnectionPool.LIVE_AND_UNUSED_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.WAIT_FOR_SOURCE_TIME, "1");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1");
//$NON-NLS-1$
- poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "true");
//$NON-NLS-1$
+ poolProperties.put(ConnectionPool.ENABLE_SHRINKING, Boolean.FALSE.toString());
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
- singleIDPool.initialize(poolProperties);
+ singleIDPool.initialize(createConnectorEnvironment(poolProperties));
ExecutionContext context = createContext("x", false);//$NON-NLS-1$
@@ -204,7 +217,7 @@
}
}
- public void testPoolUsingUserIdentity() throws Exception {
+ @Test public void testPoolUsingUserIdentity() throws Exception {
ExecutionContext context1 = createContext("Jack", true); //$NON-NLS-1$
ExecutionContext context2 = createContext("Tom", true); //$NON-NLS-1$
userIDPool.obtain(context1);
@@ -223,7 +236,7 @@
assertEquals(3, userIDPool.getTotalConnectionCount());
}
- public void testPoolCleanUp() throws Exception {
+ @Test public void testPoolCleanUp() throws Exception {
ExecutionContext context = createContext("x", false);
//$NON-NLS-1$
ConnectionWrapper conn1 = singleIDPool.obtain(context);
@@ -247,7 +260,7 @@
assertEquals(2, singleIDPool.getTotalConnectionCount());
}
- public void testMultiThreading() throws Exception {
+ @Test public void testMultiThreading() throws Exception {
EXCEPTIONS.clear();
int runnerNumber = 20;
int connPerRunner = 20;
@@ -316,7 +329,7 @@
}
}
- public void testMaxWithUserPool() throws Exception {
+ @Test public void testMaxWithUserPool() throws Exception {
userIDPool = new ConnectionPool(new ConnectorWrapper(new
FakeUserIdentityConnectionFactory()));
Properties poolProperties = new Properties();
poolProperties.put(ConnectionPool.MAX_CONNECTIONS, "1"); //$NON-NLS-1$
@@ -326,7 +339,7 @@
poolProperties.put(ConnectionPool.CLEANING_INTERVAL, "1000");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.ENABLE_SHRINKING, "false");
//$NON-NLS-1$
poolProperties.put(ConnectionPool.SOURCE_CONNECTION_TEST_INTERVAL,
"-1"); //$NON-NLS-1$
- userIDPool.initialize(poolProperties);
+ userIDPool.initialize(createConnectorEnvironment(poolProperties));
ConnectionWrapper conn = userIDPool.obtain(createContext("x", true));
//$NON-NLS-1$
userIDPool.release(conn,false);
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -34,8 +34,6 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.messaging.MessageBus;
import com.metamatrix.common.net.ServerSocketConfiguration;
-import com.metamatrix.common.queue.WorkerPool;
-import com.metamatrix.common.queue.WorkerPoolFactory;
import com.metamatrix.common.queue.WorkerPoolStats;
import com.metamatrix.common.util.LogCommonConstants;
import com.metamatrix.common.util.PropertiesUtils;
@@ -43,9 +41,9 @@
import com.metamatrix.platform.PlatformPlugin;
import com.metamatrix.platform.registry.ClusteredRegistryState;
import com.metamatrix.platform.util.PlatformProxyHelper;
+import com.metamatrix.platform.vm.controller.ProcessController;
import com.metamatrix.platform.vm.controller.ServerEvents;
import com.metamatrix.platform.vm.controller.SocketListenerStats;
-import com.metamatrix.platform.vm.controller.ProcessController;
import com.metamatrix.server.Configuration;
import com.metamatrix.server.HostManagement;
import com.metamatrix.server.Main;
@@ -60,20 +58,16 @@
private static final String SERVER_PORT = VMComponentDefnType.SERVER_PORT;
private static final String MAX_THREADS = VMComponentDefnType.MAX_THREADS;
- private static final String TIMETOLIVE = VMComponentDefnType.TIMETOLIVE;
private static final String INPUT_BUFFER_SIZE =
VMComponentDefnType.INPUT_BUFFER_SIZE;
private static final String OUTPUT_BUFFER_SIZE =
VMComponentDefnType.OUTPUT_BUFFER_SIZE;
private static final int DEFAULT_SERVER_PORT = 31000;
private static final int DEFAULT_MAX_THREADS = 15;
- private static final long DEFAULT_TIMETOLIVE = 15000;
private static final long DEFAULT_WAITFORSERVICES = 500;
private static final int DEFAULT_INPUT_BUFFER_SIZE = 0;
private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 0;
- private static final String SOCKET_WORKER_POOL_NAME = "SocketWorkerQueue";
//$NON-NLS-1$
private SocketListener listener;
- private WorkerPool workerPool;
@Inject
public SocketVMController((a)Named(Configuration.HOST) Host host,
@Named(Configuration.PROCESSNAME) String processName, ClusteredRegistryState registry,
ServerEvents serverEvents, MessageBus bus, HostManagement hostManagement) throws Exception
{
@@ -94,17 +88,6 @@
// so that the port can be made available sooner on bounces.
@Override
public void shutdown(boolean now) {
- if (workerPool != null) {
- try {
- workerPool.shutdownNow();
- } catch (Exception e) {
- //ignore
- } finally {
- workerPool = null;
- }
-
- }
-
if (listener != null) {
try {
listener.stop();
@@ -128,7 +111,6 @@
Properties props = getProperties();
int socketPort = PropertiesUtils.getIntProperty(props, SERVER_PORT,
DEFAULT_SERVER_PORT);
int maxThreads = PropertiesUtils.getIntProperty(props, MAX_THREADS,
DEFAULT_MAX_THREADS);
- long timeToLive = PropertiesUtils.getLongProperty(props, TIMETOLIVE,
DEFAULT_TIMETOLIVE);
int inputBufferSize = PropertiesUtils.getIntProperty(props, INPUT_BUFFER_SIZE,
DEFAULT_INPUT_BUFFER_SIZE);
int outputBufferSize = PropertiesUtils.getIntProperty(props, OUTPUT_BUFFER_SIZE,
DEFAULT_OUTPUT_BUFFER_SIZE);
String bindaddress = VMNaming.getBindAddress();
@@ -138,11 +120,10 @@
};
logMessage(PlatformPlugin.Util.getString("SocketVMController.1",
param)); //$NON-NLS-1$
- workerPool = WorkerPoolFactory.newWorkerPool(SOCKET_WORKER_POOL_NAME, maxThreads,
timeToLive);
ServerSocketConfiguration helper = new ServerSocketConfiguration();
try {
helper.init();
- listener = new SocketListener(socketPort, bindaddress, this.clientServices,
inputBufferSize, outputBufferSize, workerPool, helper.getServerSSLEngine(),
helper.isClientEncryptionEnabled(),
PlatformProxyHelper.getSessionServiceProxy(PlatformProxyHelper.ROUND_ROBIN_LOCAL));
+ listener = new SocketListener(socketPort, bindaddress, this.clientServices,
inputBufferSize, outputBufferSize, maxThreads, helper.getServerSSLEngine(),
helper.isClientEncryptionEnabled(),
PlatformProxyHelper.getSessionServiceProxy(PlatformProxyHelper.ROUND_ROBIN_LOCAL));
} catch (Exception e) {
LogManager.logCritical(LogCommonConstants.CTX_CONTROLLER, e,
PlatformPlugin.Util.getString("SocketVMController.2",param)); //$NON-NLS-1$
System.exit(1);
@@ -182,10 +163,7 @@
}
protected WorkerPoolStats getProcessPoolStats() {
- if (workerPool == null) {
- return null;
- }
- return workerPool.getStats();
+ return this.listener.getProcessPoolStats();
}
@Deprecated
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -34,7 +34,6 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.admin.api.exception.AdminException;
import com.metamatrix.admin.api.exception.AdminProcessingException;
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -77,9 +76,6 @@
*/
public void run() {
DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
- if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT,
MessageLevel.DETAIL)) {
- LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "forwarding message to
listener:" + message); //$NON-NLS-1$
- }
Message result = null;
String service = null;
final boolean encrypt = message.getContents() instanceof SealedObject;
@@ -162,20 +158,25 @@
// Case 5558: Differentiate between system level errors and
// processing errors. Only log system level errors as errors,
// log the processing errors as warnings only
- String msg =
PlatformPlugin.Util.getString("ServerWorkItem.Received_exception_processing_request");
//$NON-NLS-1$
if (e instanceof MetaMatrixProcessingException) {
- LogManager.logWarning(context, e, msg);
+ logProcessingException(e, context);
} else if (e instanceof AdminProcessingException) {
- LogManager.logWarning(context, e, msg);
+ logProcessingException(e, context);
} else {
- LogManager.logError(context, e, msg);
+ LogManager.logError(context, e,
PlatformPlugin.Util.getString("ServerWorkItem.Received_exception_processing_request",
this.socketClientInstance.getWorkContext().getConnectionID())); //$NON-NLS-1$
}
-
- if (e instanceof AdminException) {
- return e;
- }
return new ExceptionHolder(e);
}
+
+ private void logProcessingException(Throwable e, String context) {
+ Throwable cause = e;
+ while (cause.getCause() != null) {
+ cause = e.getCause();
+ }
+ StackTraceElement elem = cause.getStackTrace()[0];
+ LogManager.logDetail(context, e, "Processing exception for session",
this.socketClientInstance.getWorkContext().getConnectionID()); //$NON-NLS-1$
+ LogManager.logWarning(context,
PlatformPlugin.Util.getString("ServerWorkItem.processing_error", e.getMessage(),
this.socketClientInstance.getWorkContext().getConnectionID(), e.getClass().getName(),
elem)); //$NON-NLS-1$
+ }
}
\ No newline at end of file
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -37,7 +37,6 @@
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.comm.platform.socket.SocketVMController;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.util.crypto.CryptoException;
import com.metamatrix.common.util.crypto.Cryptor;
import com.metamatrix.common.util.crypto.DhKeyGenerator;
@@ -56,7 +55,6 @@
public class SocketClientInstance implements ChannelListener, ClientInstance {
private final ObjectChannel objectSocket;
- private final WorkerPool workerPool;
private final ClientServiceRegistry server;
private Cryptor cryptor;
private boolean usingEncryption;
@@ -64,9 +62,8 @@
private DQPWorkContext workContext = new DQPWorkContext();
private SessionServiceInterface sessionService;
- public SocketClientInstance(ObjectChannel objectSocket, WorkerPool workerPool,
ClientServiceRegistry server, boolean isClientEncryptionEnabled, SessionServiceInterface
sessionService) {
+ public SocketClientInstance(ObjectChannel objectSocket, ClientServiceRegistry server,
boolean isClientEncryptionEnabled, SessionServiceInterface sessionService) {
this.objectSocket = objectSocket;
- this.workerPool = workerPool;
this.server = server;
this.usingEncryption = isClientEncryptionEnabled;
this.sessionService = sessionService;
@@ -146,7 +143,7 @@
if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT,
MessageLevel.DETAIL)) {
LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "processing
message:" + packet); //$NON-NLS-1$
}
- workerPool.execute(new ServerWorkItem(this, packet.getMessageKey(), packet,
this.server, this.sessionService));
+ new ServerWorkItem(this, packet.getMessageKey(), packet, this.server,
this.sessionService).run();
}
public void shutdown() throws CommunicationException {
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -23,9 +23,8 @@
package com.metamatrix.common.comm.platform.socket.server;
import java.net.InetSocketAddress;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.net.ssl.SSLEngine;
@@ -43,6 +42,7 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.queue.WorkerPoolFactory;
+import com.metamatrix.common.queue.WorkerPoolStats;
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.MetaMatrixProductVersion;
import com.metamatrix.platform.security.api.service.SessionServiceInterface;
@@ -53,11 +53,12 @@
*/
public class SocketListener implements ChannelListenerFactory {
private ClientServiceRegistry server;
- private WorkerPool workerPool;
private SSLAwareChannelHandler channelHandler;
private Channel serverChanel;
private boolean isClientEncryptionEnabled;
private SessionServiceInterface sessionService;
+ private WorkerPool workerPool;
+ private ExecutorService bossService;
/**
*
@@ -71,7 +72,7 @@
*/
public SocketListener(int port, String bindAddress,
ClientServiceRegistry server, int inputBufferSize,
- int outputBufferSize, WorkerPool workerPool, SSLEngine engine, boolean
isClientEncryptionEnabled, SessionServiceInterface sessionService) {
+ int outputBufferSize, int maxWorkers, SSLEngine engine, boolean
isClientEncryptionEnabled, SessionServiceInterface sessionService) {
this.isClientEncryptionEnabled = isClientEncryptionEnabled;
this.sessionService = sessionService;
if (port < 0 || port > 0xFFFF) {
@@ -79,18 +80,14 @@
}
this.server = server;
- this.workerPool = workerPool;
+ this.workerPool = WorkerPoolFactory.newWorkerPool("SocketWorker",
maxWorkers, 120000); //$NON-NLS-1$
+ this.bossService = Executors.newCachedThreadPool();
if (LogManager.isMessageToBeRecorded(SocketVMController.SOCKET_CONTEXT,
MessageLevel.DETAIL)) {
LogManager.logDetail(SocketVMController.SOCKET_CONTEXT, "server = "
+ this.server + "binding to port:" + port); //$NON-NLS-1$ //$NON-NLS-2$
}
- ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
- Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
- new SynchronousQueue<Runnable>(),
- new WorkerPoolFactory.DefaultThreadFactory("ServerNio")); //$NON-NLS-1$
+ ChannelFactory factory = new NioServerSocketChannelFactory(bossService,
workerPool, workerPool.getPoolSize());
- ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor);
-
ServerBootstrap bootstrap = new ServerBootstrap(factory);
this.channelHandler = new SSLAwareChannelHandler(this, engine,
Thread.currentThread().getContextClassLoader());
bootstrap.setPipelineFactory(channelHandler);
@@ -105,6 +102,10 @@
this.serverChanel = bootstrap.bind(new InetSocketAddress(bindAddress, port));
}
+ public WorkerPoolStats getProcessPoolStats() {
+ return this.workerPool.getStats();
+ }
+
public int getPort() {
return ((InetSocketAddress)this.serverChanel.getLocalAddress()).getPort();
}
@@ -115,6 +116,8 @@
public void stop() {
this.serverChanel.close();
+ this.workerPool.shutdownNow();
+ this.bossService.shutdownNow();
}
public SocketListenerStats getStats() {
@@ -127,7 +130,7 @@
}
public ChannelListener createChannelListener(ObjectChannel channel) {
- return new SocketClientInstance(channel, this.workerPool, this.server,
this.isClientEncryptionEnabled, this.sessionService);
+ return new SocketClientInstance(channel, this.server, this.isClientEncryptionEnabled,
this.sessionService);
}
}
\ No newline at end of file
Modified: trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties
===================================================================
--- trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties 2009-03-30
20:04:22 UTC (rev 663)
+++ trunk/server/src/main/resources/com/metamatrix/platform/i18n.properties 2009-03-30
20:55:34 UTC (rev 664)
@@ -1273,7 +1273,8 @@
LDAPMembershipDomain.Admin_credentials=Admin DN and/or password supplied for LDAP
Membership Domain {0} are invalid.
LDAPMembershipDomain.Require_memberof_property=Users in LDAP Membership Domain {0} will
not appear as members of any group since user's memberOf and group's memberOf
attributes are both unspecified.
-ServerWorkItem.Received_exception_processing_request=Received exception processing
request
+ServerWorkItem.Received_exception_processing_request=Unexpected exception for session
{0}
+ServerWorkItem.processing_error=Processing exception ''{0}'' for session
{1}. Exception type {2} throw from {3}. Enable detail logging to see the entire
stacktrace.
ServerWorkItem.Component_Not_Found=Component not found: {0}
SocketVMController.0=Register Admin API
Modified:
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
===================================================================
---
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java 2009-03-30
20:04:22 UTC (rev 663)
+++
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java 2009-03-30
20:55:34 UTC (rev 664)
@@ -46,7 +46,6 @@
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.client.UrlServerDiscovery;
-import com.metamatrix.common.queue.WorkerPoolFactory;
import com.metamatrix.common.util.crypto.NullCryptor;
import com.metamatrix.platform.security.api.ILogon;
import com.metamatrix.platform.security.api.LogonResult;
@@ -74,8 +73,7 @@
SessionServiceInterface sessionService = mock(SessionServiceInterface.class);
csr.registerClientService(ILogon.class, new LogonImpl(sessionService,
"fakeCluster"), "foo"); //$NON-NLS-1$ //$NON-NLS-2$
listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, WorkerPoolFactory.newWorkerPool(
- "testIO", 1, 120000), null, true, sessionService); //$NON-NLS-1$
+ csr, 1024, 1024, 1, null, true, sessionService);
try {
Properties p = new Properties();
@@ -152,8 +150,7 @@
}
}, "foo"); //$NON-NLS-1$
listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, WorkerPoolFactory.newWorkerPool(
- "testIO", 1, 120000), serverSSL, isClientEncryptionEnabled,
sessionService); //$NON-NLS-1$
+ csr, 1024, 1024, 1, serverSSL, isClientEncryptionEnabled, sessionService);
SocketListenerStats stats = listener.getStats();
assertEquals(0, stats.maxSockets);
assertEquals(0, stats.objectsRead);