[jboss-svn-commits] JBL Code SVN: r20698 - in labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product: rosetta/src/org/jboss/internal/soa/esb/dependencies and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jun 23 07:52:17 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-06-23 07:52:16 -0400 (Mon, 23 Jun 2008)
New Revision: 20698

Modified:
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml
Log:
Merged third spin of CP3

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -28,12 +28,12 @@
 import java.util.Properties;
 
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
-import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.xml.parsers.ParserConfigurationException;
@@ -98,26 +98,28 @@
 
     public void cleanup() {
         synchronized(this) {
-            if (_messageProducer != null) {
-                try {
-                    _messageProducer.close();
-                } catch (Exception e) {
-                    _logger.debug(e.getMessage(), e);
-                } finally {
-                    _messageProducer = null;
-                    closeSession();
+            try {
+                if (_messageProducer != null) {
+                    try {
+                        _messageProducer.close();
+                    } catch (Exception e) {
+                        _logger.debug(e.getMessage(), e);
+                    } finally {
+                        _messageProducer = null;
+                    }
                 }
-            }
 
-            if (_messageConsumer != null) {
-                try {
-                   _messageConsumer.close();
-                } catch (JMSException e) {
-                    _logger.debug(e.getMessage(), e);
-                } finally {
-                    _messageConsumer = null;
-                    closeSession();
+                if (_messageConsumer != null) {
+                    try {
+                       _messageConsumer.close();
+                    } catch (JMSException e) {
+                        _logger.debug(e.getMessage(), e);
+                    } finally {
+                        _messageConsumer = null;
+                    }
                 }
+            } finally {
+                closeSession() ;
             }
         }
     } // ________________________________
@@ -311,48 +313,46 @@
     } // ________________________________
 
     private void createMessageProducer() throws CourierException, NamingContextException {
-        Context oJndiCtx = null;
-        
         synchronized(this) {
             if (_messageProducer == null) {
                 try {
-                    oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
-
-                    String sType = _epr.getDestinationType();
-                    if (JMSEpr.QUEUE_TYPE.equals(sType)) {
-                        Session qSess = getJmsSession(_epr.getAcknowledgeMode());
-                        javax.jms.Queue queue = null;
-                        try {
-                            queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                    .getDestinationName());
-                        } catch (NamingException ne) {
+                    final Session session = getJmsSession(_epr.getAcknowledgeMode());
+                    Destination destination = null ;
+                    final String destinationName = _epr.getDestinationName() ;
+                    Context oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
+                    try {
+                        String sType = _epr.getDestinationType();
+                        if (JMSEpr.QUEUE_TYPE.equals(sType)) {
                             try {
-                                oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
-                                queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                        .getDestinationName());
-                            } catch (NamingException nex) {
-                                //ActiveMQ
-                                queue = qSess.createQueue(_epr.getDestinationName());
+                                destination = (Destination) oJndiCtx.lookup(destinationName);
+                            } catch (NamingException ne) {
+                                try {
+                                    oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
+                                    destination = (Destination) oJndiCtx.lookup(destinationName);
+                                } catch (NamingException nex) {
+                                    //ActiveMQ
+                                    destination = session.createQueue(destinationName);
+                                }
                             }
+                        } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+                            try {
+                                destination = (Destination) oJndiCtx.lookup(destinationName);
+                            }
+                            catch (NamingException ne) {
+                                destination = session.createTopic(destinationName);
+                            }
+                        } else {
+                            throw new CourierException("Unknown destination type");
                         }
-                        _messageProducer = qSess.createProducer(queue);
-                    } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
-                        Session tSess = getJmsSession(_epr.getAcknowledgeMode());
-                        Topic topic = null;
-                        try {
-                            topic = (Topic) oJndiCtx.lookup(_epr
-                                    .getDestinationName());
+                        _messageProducer = session.createProducer(destination);
+                        _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
+                        if ( _logger.isDebugEnabled() )
+                            _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+                    } finally {
+                        if (oJndiCtx != null) {
+                            NamingContextPool.releaseNamingContext(oJndiCtx) ;
                         }
-                        catch (NamingException ne) {
-                            topic = tSess.createTopic(_epr.getDestinationName());
-                        }
-                        _messageProducer = tSess.createProducer(topic);
-                    } else {
-                        throw new CourierException("Unknown destination type");
                     }
-                    _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-                    if ( _logger.isDebugEnabled() )
-                        _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
                 }
                 catch (JMSException ex) {
                     _logger.debug("Error from JMS system.", ex);
@@ -361,10 +361,6 @@
                 }
                 catch (URISyntaxException ex) {
                     throw new CourierException(ex);
-                } finally {
-                    if (oJndiCtx != null) {
-                        NamingContextPool.releaseNamingContext(oJndiCtx) ;
-                    }
                 }
             }
         }
@@ -491,42 +487,39 @@
                 boolean success = false;
                 try {
                     Properties environment = _epr.getJndiEnvironment();
+                    final Session session = getJmsSession(_epr.getAcknowledgeMode());
+                    Destination destination = null ;
+                    final String destinationName = _epr.getDestinationName() ;
                     oJndiCtx = NamingContextPool.getNamingContext(environment);
                     try
                     {
                         String sType = _epr.getDestinationType();
                         if (JMSEpr.QUEUE_TYPE.equals(sType)) {
-                            Session qSess = getJmsSession(_epr.getAcknowledgeMode());
-                            javax.jms.Queue queue = null;
                             try {
-                                queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                        .getDestinationName());
+                                destination = (Destination) oJndiCtx.lookup(destinationName);
                             } catch (NamingException ne) {
                                 try {
                                     oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
-                                    queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                            .getDestinationName());
+                                    destination = (Destination) oJndiCtx.lookup(destinationName);
                                 } catch (NamingException nex) {
                                     //ActiveMQ
-                                    queue = qSess.createQueue(_epr.getDestinationName());
+                                    destination = session.createQueue(destinationName);
                                 }
                             }
-                            _messageConsumer = qSess.createConsumer(queue, _epr.getMessageSelector());
                         } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
-                            Session tSess = getJmsSession(_epr.getAcknowledgeMode());
-                            Topic topic = null;
                              try {
-                                   topic = (Topic) oJndiCtx.lookup(_epr
-                                                            .getDestinationName());
+                                   destination = (Destination) oJndiCtx.lookup(destinationName);
                              }
                              catch (NamingException ne) {
-                                   topic = tSess.createTopic(_epr.getDestinationName());
+                                   destination = session.createTopic(destinationName);
                              }
-                                  _messageConsumer = tSess.createConsumer(topic, _epr
-                                         .getMessageSelector());
                         } else {
                             throw new CourierException("Unknown destination type");
                         }
+                        if (destination == null) {
+                            throw new CourierException("Could not locate destination: " + destinationName);
+                        }
+                        _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
                         success = true;
                     } finally {
                         NamingContextPool.releaseNamingContext(oJndiCtx) ;

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -427,7 +427,7 @@
                 {
                 	log.debug( "Starting remote h2 db with port : " + port );
                 	final String[] args = new String[] { 
-                			"-baseDir ", dbPath.toURI().toString(), 
+                			"-baseDir", dbPath.getAbsolutePath(), 
                 			"-tcpPort", String.valueOf(port),
                 			"-tcpAllowOthers","" }; //	need the extra empty string or a exception is thrown by H2
                 	final Server server = Server.createTcpServer(args) ;
@@ -507,7 +507,7 @@
        final Server server = getRemoteServer() ;
        if (server != null)
        {
-           server.shutdown() ;
+           server.stop() ;
        }
    }
    

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -101,10 +101,6 @@
         int amtRead = inputStream.read(byteBuffer);
         while (amtRead > 0) {
             byteOutputStream.write(byteBuffer, pointer, amtRead);
-            if (amtRead < bufferSize && byteOutputStream.size() >= contentLength) {
-                //done reading, so process
-                break;
-            }
             amtRead = inputStream.read(byteBuffer);
         }
 

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -24,6 +24,7 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
@@ -121,6 +122,11 @@
     private boolean terminated ;
     
     /**
+     * The pool instance id.
+     */
+    private long id ;
+    
+    /**
      * Contructor of the pool.
      * 
      */
@@ -161,9 +167,9 @@
             {
                 final JmsSession session ;
                 if (transacted) {
-                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession());
+                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
                 } else {
-                    session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+                    session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
                 }
                 return session ;
             }
@@ -303,19 +309,22 @@
      */
     synchronized void handleCloseSession(final JmsSession session)
     {
-        final int mode ;
-        try {
-            mode = session.getAcknowledgeMode() ;
-        } catch (final JMSException jmse) {
-            logger.warn("JMSException while calling getAcknowledgeMode") ;
-            logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
-            return ;
+        if (session.getId() == id)
+        {
+            final int mode ;
+            try {
+                mode = session.getAcknowledgeMode() ;
+            } catch (final JMSException jmse) {
+                logger.warn("JMSException while calling getAcknowledgeMode") ;
+                logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+                return ;
+            }
+            
+            final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+            if (sessions != null) {
+                sessions.add(session) ;
+            }
         }
-        
-        final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
-        if (sessions != null) {
-            sessions.add(session) ;
-        }
         handleReleaseSession(session) ;
     }
     
@@ -366,6 +375,42 @@
     }
 
     /**
+     * This method is called when the pool needs to cleaned. It closes all open sessions
+     * and the connection.
+     */
+    private void cleanSessionPool()
+    {
+        final Connection connection ;
+        synchronized(this)
+       {
+            if (terminated)
+            {
+                return ;
+            }
+            id++ ;
+            for (List<JmsSession> list : freeSessionsMap.values())
+            {
+                list.clear() ;
+            }
+            for (List<JmsSession> list : inUseSessionsMap.values())
+            {
+                list.clear() ;
+            }
+            transactionsToSessions.clear() ;
+            sessionsToTransactions.clear() ;
+            
+            logger.debug("Cleared the session pool now closing the connection to the factory.");
+            connection = jmsConnection ;
+            jmsConnection = null ;
+        }
+        if (connection!=null) {
+            try {
+                connection.close();
+            } catch (final Exception ex) {} // ignore
+        }
+    }
+    
+    /**
      * This method is called when the pool needs to destroyed. It closes all open sessions
      * and the connection and removes it from the container's poolMap.
      */
@@ -481,7 +526,7 @@
                 jmsConnection.setExceptionListener(new ExceptionListener() {
                     public void onException(JMSException arg0)
                     {
-                        removeSessionPool() ;
+                        cleanSessionPool() ;
                     }
                 }) ;
                 jmsConnection.start();

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -52,6 +52,10 @@
      * The session delegate.
      */
     private final Session session ;
+    /**
+     * The pool instance id.
+     */
+    private final long id ;
     
     /**
      * The set of active queue browsers.
@@ -69,15 +73,22 @@
     /**
      * Create the session wrapper.
      * @param session The session delegate.
+     * @param id The pool instance id.
      * @param isJTA True if this tales part in a JTA transaction
      * @throws JMSException
      */
-    JmsSession(final Session session)
+    JmsSession(final Session session, final long id)
         throws JMSException
     {
+        this.id = id ;
         this.session = session ;
     }
 
+    public long getId()
+    {
+        return id ;
+    }
+    
     public void close() throws JMSException
     {
         session.close();

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-06-23 11:52:16 UTC (rev 20698)
@@ -72,13 +72,13 @@
      * Create the session wrapper.
      * @param pool The current connection pool
      * @param session The session delegate.
-     * @param isJTA True if this tales part in a JTA transaction
+     * @param id The pool instance id.
      * @throws JMSException
      */
-    JmsXASession(final JmsConnectionPool pool, final XASession session)
+    JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
         throws JMSException
     {
-        super(session) ;
+        super(session, id) ;
         this.pool = pool ;
         this.session = session ;
     }

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml	2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml	2008-06-23 11:52:16 UTC (rev 20698)
@@ -3,4 +3,7 @@
   <depends>jboss.jca:service=DataSourceBinding,name=JBossESBDS</depends>
   <depends>jboss.esb.destination:service=Queue,name=DeadMessageQueue</depends>
   <depends>jboss.esb:service=MessageStoreDatabaseInitializer</depends>
+  <depends>jboss.esb.destination:service=Queue,name=DataCollectorQueue</depends>
+  <depends>jboss.esb.destination:service=Queue,name=OperationsCollectorQueue</depends>
+  <depends>jboss.esb.destination:service=Queue,name=OperationsResultCollectorQueue</depends> 
 </jbossesb-deployment>




More information about the jboss-svn-commits mailing list