[jboss-svn-commits] JBL Code SVN: r20373 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jun 9 10:37:42 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-06-09 10:37:42 -0400 (Mon, 09 Jun 2008)
New Revision: 20373

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
Log:
Create JBM sessions from a daemon thread: JBESB-1799

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-06-09 14:37:22 UTC (rev 20372)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-06-09 14:37:42 UTC (rev 20373)
@@ -26,6 +26,14 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -64,6 +72,15 @@
 	private static int CONFIGURED_POOL_SIZE = DEFAULT_POOL_SIZE;
 	private static int CONFIGURED_SLEEP = DEFAULT_SLEEP;
 	
+	/**
+	 * The executor used to create sessions.
+	 */
+	private static final Executor SESSION_EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory()) ;
+	/**
+	 * The completion service.
+	 */
+	private static final CompletionService<JmsSession> COMPLETION_SERVICE = new ExecutorCompletionService<JmsSession>(SESSION_EXECUTOR) ;
+	
     /** Maximum number of Sessions that will be created in this pool */
     private int MAX_SESSIONS = DEFAULT_POOL_SIZE;    //TODO Make this manageable
     
@@ -138,16 +155,44 @@
     private  synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
         throws JMSException
     {
+        final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
+            public JmsSession call()
+                throws JMSException
+            {
+                final JmsSession session ;
+                if (transacted) {
+                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession());
+                } else {
+                    session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+                }
+                return session ;
+            }
+        }) ;
+        
         //Create a new Session
         ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
         // For now we only support JTA transacted sessions
-        final JmsSession session ;
-        if (transacted) {
-            session = new JmsXASession(this, ((XAConnection)jmsConnection).createXASession());
-        } else {
-            session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+        try
+        {
+            freeSessions.add(future.get());
         }
-        freeSessions.add(session);
+        catch (final InterruptedException ie) {} // ignore
+        catch (final ExecutionException ee)
+        {
+            final Throwable th = ee.getCause() ;
+            if (th instanceof JMSException)
+            {
+                throw (JMSException)th ;
+            }
+            if (th instanceof Error)
+            {
+                throw (Error)th ;
+            }
+            if (th instanceof RuntimeException)
+            {
+                throw (RuntimeException)th ;
+            }
+        }
         logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
     }
 
@@ -531,4 +576,27 @@
     		}
     	}
     }
+    
+    /**
+     * Thread factory returning daemon threads.
+     * @author kevin
+     */
+    private static final class DaemonThreadFactory implements ThreadFactory
+    {
+        /**
+         * The default executor factory.
+         */
+        private final ThreadFactory defaultFactory = Executors.defaultThreadFactory() ;
+        
+        /**
+         * Return a new daemon thread.
+         * @param runnable The runnable associated with the thread.
+         */
+        public Thread newThread(final Runnable runnable)
+        {
+            final Thread thread = defaultFactory.newThread(runnable) ;
+            thread.setDaemon(true) ;
+            return thread ;
+        }
+    }
 }




More information about the jboss-svn-commits mailing list