[jboss-svn-commits] JBL Code SVN: r20373 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jun 9 10:37:42 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-06-09 10:37:42 -0400 (Mon, 09 Jun 2008)
New Revision: 20373
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
Log:
Create JBM sessions from a daemon thread: JBESB-1799
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-06-09 14:37:22 UTC (rev 20372)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-06-09 14:37:42 UTC (rev 20373)
@@ -26,6 +26,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -64,6 +72,15 @@
private static int CONFIGURED_POOL_SIZE = DEFAULT_POOL_SIZE;
private static int CONFIGURED_SLEEP = DEFAULT_SLEEP;
+ /**
+ * The executor used to create sessions.
+ */
+ private static final Executor SESSION_EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory()) ;
+ /**
+ * The completion service.
+ */
+ private static final CompletionService<JmsSession> COMPLETION_SERVICE = new ExecutorCompletionService<JmsSession>(SESSION_EXECUTOR) ;
+
/** Maximum number of Sessions that will be created in this pool */
private int MAX_SESSIONS = DEFAULT_POOL_SIZE; //TODO Make this manageable
@@ -138,16 +155,44 @@
private synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
throws JMSException
{
+ final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
+ public JmsSession call()
+ throws JMSException
+ {
+ final JmsSession session ;
+ if (transacted) {
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession());
+ } else {
+ session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+ }
+ return session ;
+ }
+ }) ;
+
//Create a new Session
ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
// For now we only support JTA transacted sessions
- final JmsSession session ;
- if (transacted) {
- session = new JmsXASession(this, ((XAConnection)jmsConnection).createXASession());
- } else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+ try
+ {
+ freeSessions.add(future.get());
}
- freeSessions.add(session);
+ catch (final InterruptedException ie) {} // ignore
+ catch (final ExecutionException ee)
+ {
+ final Throwable th = ee.getCause() ;
+ if (th instanceof JMSException)
+ {
+ throw (JMSException)th ;
+ }
+ if (th instanceof Error)
+ {
+ throw (Error)th ;
+ }
+ if (th instanceof RuntimeException)
+ {
+ throw (RuntimeException)th ;
+ }
+ }
logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
}
@@ -531,4 +576,27 @@
}
}
}
+
+ /**
+ * Thread factory returning daemon threads.
+ * @author kevin
+ */
+ private static final class DaemonThreadFactory implements ThreadFactory
+ {
+ /**
+ * The default executor factory.
+ */
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory() ;
+
+ /**
+ * Return a new daemon thread.
+ * @param runnable The runnable associated with the thread.
+ */
+ public Thread newThread(final Runnable runnable)
+ {
+ final Thread thread = defaultFactory.newThread(runnable) ;
+ thread.setDaemon(true) ;
+ return thread ;
+ }
+ }
}
More information about the jboss-svn-commits
mailing list