[jboss-cvs] JBoss Messaging SVN: r3609 - in trunk: src/main/org/jboss/jms/client/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 20:32:51 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-01-21 20:32:51 -0500 (Mon, 21 Jan 2008)
New Revision: 3609

Modified:
   trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
Fixing closeChildren logic

Modified: trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -58,17 +58,22 @@
                                                     ServerSessionPool sessionPool,
                                                     int maxMessages) throws JMSException;
 
-   public void setRemotingConnection(JMSRemotingConnection conn);
+   void setRemotingConnection(JMSRemotingConnection conn);
    
-   public Client getClient();
+   Client getClient();
 
-   public JMSRemotingConnection getRemotingConnection();
+   JMSRemotingConnection getRemotingConnection();
 
-   public ResourceManager getResourceManager();
+   ResourceManager getResourceManager();
 
-   public void setResourceManager(ResourceManager resourceManager);
+   void setResourceManager(ResourceManager resourceManager);
    
-   public String getID();
+   String getID();
    
-   public byte getVersion();
+   byte getVersion();
+   
+   
+   /** This is a method used by children Session during close operations */
+   void removeChild(String key);
+
 }

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -173,9 +173,12 @@
    
    boolean isTreatAsNonTransactedWhenNotEnlisted();
    
-   public void setTreatAsNonTransactedWhenNotEnlisted(boolean treatAsNonTransactedWhenNotEnlisted);
+   void setTreatAsNonTransactedWhenNotEnlisted(boolean treatAsNonTransactedWhenNotEnlisted);
 
-   public Object getCurrentTxId();
+   Object getCurrentTxId();
 
-   public void setCurrentTxId(Object currentTxId);
+   void setCurrentTxId(Object currentTxId);
+   
+   /** This is a method used by children (Producer, Consumer and Browser) during close operations */
+   void removeChild(String key);
 }

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientBrowserImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -52,7 +52,7 @@
  *
  * $Id: ClientBrowserImpl.java 3602 2008-01-21 17:48:32Z timfox $
  */
-public class ClientBrowserImpl extends CommunicationSupport<ClientBrowserImpl> implements ClientBrowser
+public class ClientBrowserImpl extends CommunicationSupport implements ClientBrowser
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -92,7 +92,14 @@
 
    public void close() throws JMSException
    {
-      sendBlocking(new CloseMessage());
+      try
+      {
+         sendBlocking(new CloseMessage());
+      }
+      finally
+      {
+         session.removeChild(this.getID());
+      }
    }
 
    public long closing(long sequence) throws JMSException

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -55,7 +55,7 @@
  * $Id: ClientConnectionFactoryImpl.java 3602 2008-01-21 17:48:32Z timfox $
  */
 public class ClientConnectionFactoryImpl
-   extends CommunicationSupport<ClientConnectionFactoryImpl> implements Serializable
+   extends CommunicationSupport implements Serializable
 {
    // Constants ------------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -23,7 +23,12 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -73,7 +78,7 @@
  *
  * $Id: ClientConnectionImpl.java 3602 2008-01-21 17:48:32Z timfox $
  */
-public class ClientConnectionImpl extends CommunicationSupport<ClientConnectionImpl> implements ClientConnection
+public class ClientConnectionImpl extends CommunicationSupport implements ClientConnection
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -98,7 +103,7 @@
    
    // Attributes that used to be on ConnectionState
    
-   protected Set<ClientSession> children = new ConcurrentHashSet<ClientSession>();
+   protected Map<String, ClientSession> children = new ConcurrentHashMap<String, ClientSession>();
 
    protected boolean started;
 
@@ -207,7 +212,7 @@
       ClientSessionImpl delegate = new ClientSessionImpl(this, response.getSessionID(), response.getDupsOKBatchSize(), isStrictTck(), 
             transacted, acknowledgmentMode, isXA);
       ClientSession proxy =(ClientSession) ProxyFactory.proxy(delegate, ClientSession.class);
-      children.add(proxy);
+      children.put(proxy.getID(), proxy);
       return proxy;
    }
 
@@ -360,13 +365,21 @@
       return "ConnectionDelegate[" + System.identityHashCode(this) + ", ID=" + id +
          ", SID=" + serverID + "]";
    }
+   
+   // Package protected
+   
+   public void removeChild(String key)
+   {
+      children.remove(key);
+   }
 
    // Protected ------------------------------------------------------------------------------------
    
-   
    protected void closeChildren() throws JMSException
    {
-      for (ClientSession session: children)
+      Set<ClientSession> childrenClone = new HashSet<ClientSession>(children.values());
+      
+      for (ClientSession session: childrenClone)
       {
          try
          {
@@ -383,8 +396,6 @@
             }
          }         
       }
-      
-      children.clear();
    }
 
    

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -64,7 +64,7 @@
  *
  * $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
  */
-public class ClientConsumerImpl extends CommunicationSupport<ClientConsumerImpl> implements ClientConsumer
+public class ClientConsumerImpl extends CommunicationSupport implements ClientConsumer
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -256,7 +256,14 @@
 
    public void close() throws JMSException
    {
-      sendBlocking(new CloseMessage());
+      try
+      {
+         sendBlocking(new CloseMessage());
+      }
+      finally
+      {
+         session.removeChild(this.getID());
+      }
    }
 
 
@@ -513,41 +520,48 @@
       }
    }
    
-   public void close(long lastDeliveryId) throws JMSException
+   private void close(long lastDeliveryId) throws JMSException
    {     
-      log.trace(this + " close");
+      try
+      {
+         log.trace(this + " close");
             
-      //Wait for the last delivery to arrive
-      waitForLastDelivery(lastDeliveryId);
-      
-      //Important! We set the listener to null so the next ListenerRunner won't run
-      if (listener != null)
-      {
-         setMessageListener(null);
-      }
-      
-      //Now we wait for any current listener runners to run.
-      waitForOnMessageToComplete();   
-      
-      synchronized (mainLock)
-      {         
-         if (closed)
+         //Wait for the last delivery to arrive
+         waitForLastDelivery(lastDeliveryId);
+         
+         //Important! We set the listener to null so the next ListenerRunner won't run
+         if (listener != null)
          {
-            return;
+            setMessageListener(null);
          }
          
-         closed = true;   
+         //Now we wait for any current listener runners to run.
+         waitForOnMessageToComplete();   
          
-         if (receiverThread != null)
-         {            
-            // Wake up any receive() thread that might be waiting
-            mainLock.notify();
-         }   
-         
-         this.listener = null;
+         synchronized (mainLock)
+         {         
+            if (closed)
+            {
+               return;
+            }
+            
+            closed = true;   
+            
+            if (receiverThread != null)
+            {            
+               // Wake up any receive() thread that might be waiting
+               mainLock.notify();
+            }   
+            
+            this.listener = null;
+         }
+                              
+         if (trace) { log.trace(this + " closed"); }
       }
-                           
-      if (trace) { log.trace(this + " closed"); }
+      finally
+      {
+         session.removeChild(this.getID());
+      }
    }
      
    /**

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -58,7 +58,7 @@
  *
  * $Id: ClientProducerImpl.java 3602 2008-01-21 17:48:32Z timfox $
  */
-public class ClientProducerImpl extends CommunicationSupport<ClientProducerImpl> implements ClientProducer
+public class ClientProducerImpl extends CommunicationSupport implements ClientProducer
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -92,7 +92,7 @@
    public ClientProducerImpl(ClientConnection connection,
          ClientSession session, JBossDestination destination)
    {
-      super();
+      super(UUID.randomUUID().toString());
       this.connection = connection;
       this.session = session;
       this.destination = destination;
@@ -106,6 +106,7 @@
     */
    public void close() throws JMSException
    {
+      session.removeChild(this.getID());
       return;
    }
 

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -25,10 +25,12 @@
 import java.io.DataOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.IllegalStateException;
@@ -97,7 +99,7 @@
  *
  * $Id: ClientSessionImpl.java 3603 2008-01-21 18:49:20Z timfox $
  */
-public class ClientSessionImpl extends CommunicationSupport<ClientSessionImpl> implements ClientSession
+public class ClientSessionImpl extends CommunicationSupport implements ClientSession
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -195,19 +197,26 @@
 
    public void close() throws JMSException
    {
-      sendBlocking(new CloseMessage());
-
-      Object xid = getCurrentTxId();
-
-      if (xid != null)
+      try
       {
-         //Remove transaction from the resource manager
-         connection.getResourceManager().removeTx(xid);
+         sendBlocking(new CloseMessage());
+   
+         Object xid = getCurrentTxId();
+   
+         if (xid != null)
+         {
+            //Remove transaction from the resource manager
+            connection.getResourceManager().removeTx(xid);
+         }
+   
+         // We must explicitly shutdown the executor
+   
+         getExecutor().shutdownNow();
       }
-
-      // We must explicitly shutdown the executor
-
-      getExecutor().shutdownNow();
+      finally
+      {
+         connection.removeChild(this.getID());
+      }
    }
 
    private long invokeClosing(long sequence) throws JMSException
@@ -220,13 +229,12 @@
    
    private void closeChildren() throws JMSException
    {
-      for (Closeable child: children.values())
+      Set<Closeable> chilrenValues = new HashSet<Closeable>(children.values());
+      for (Closeable child: chilrenValues)
       {
          child.closing(-1);
          child.close();
       }
-      
-      children.clear();
    }
 
    public long closing(long sequence) throws JMSException
@@ -980,6 +988,11 @@
 
    }
    
+   public void removeChild(String key)
+   {
+      children.remove(key);
+   }
+   
    private void invokeSend(Message m) throws JMSException
    {   	
    	long seq;

Modified: trunk/src/main/org/jboss/jms/client/impl/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClosedInterceptor.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/ClosedInterceptor.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -116,7 +116,7 @@
       synchronized(this)
       {
          // object "in use", increment inUseCount
-         if (state == IN_CLOSE || state == CLOSED)
+         if (state == CLOSED)
          {
             if (isClosing || isClose)
             {
@@ -140,6 +140,7 @@
 
          if (isClosing || isClose)
       	{
+            log.warn(t.getMessage(), t);
 	      	//We swallow exceptions in close/closing, this is because if the connection fails, it is naturally for code to then close
 	      	//in a finally block, it would not then be appropriate to throw an exception. This is a common technique
 	      	//Close should ALWAYS (well apart from Errors) succeed irrespective of whether the actual connection to the server is alive.

Modified: trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -30,7 +30,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * // TODO find a better name for this class
  */
-public abstract class CommunicationSupport <T extends CommunicationSupport<?>> implements Streamable, Serializable 
+public abstract class CommunicationSupport implements Streamable, Serializable 
 {
    private static final Logger log = Logger.getLogger(CommunicationSupport.class);
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-01-21 22:22:08 UTC (rev 3608)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-01-22 01:32:51 UTC (rev 3609)
@@ -3726,7 +3726,7 @@
 	
 	      listener.waitForMessages();
 	
-	      assertFalse(listener.failed);
+	      assertFalse(listener.message, listener.failed);
 	
 	      conn.close();
 	      
@@ -3772,7 +3772,8 @@
 	
 	      listener.waitForMessages();
 	
-	      assertFalse(listener.failed);
+	      assertFalse(listener.message, listener.failed);
+	  	
 	
 	      conn.close();
 	      
@@ -3902,6 +3903,15 @@
       private Session sess;
 
       private boolean failed;
+      
+      String message = "ok";
+      
+      private void failed(String msg)
+      {
+    	  log.warn(msg);
+    	  failed = true;
+    	  this.message = msg;
+      }
 
       public void waitForMessages() throws InterruptedException
       {
@@ -3927,8 +3937,7 @@
             {
                if (!("a".equals(tm.getText())))
                {
-                  log.info("Should be a but was " + tm.getText());
-                  failed = true;
+                  failed("Should be a but was " + tm.getText());
                   latch.release();
                }
                log.info("Throwing exception");
@@ -3942,13 +3951,12 @@
                   //Message should be immediately redelivered
                   if (!("a".equals(tm.getText())))
                   {
-                     log.info("Should be a but was " + tm.getText());
-                     failed = true;
+                	 failed("Should be a but was " + tm.getText());
                      latch.release();
                   }
                   if (!tm.getJMSRedelivered())
                   {
-                     failed = true;
+                	 failed("Message was supposed to be a redelivery");
                      latch.release();
                   }
                }
@@ -3957,8 +3965,7 @@
                   //Transacted or CLIENT_ACKNOWLEDGE - next message should be delivered
                   if (!("b".equals(tm.getText())))
                   {
-                     log.info("Should be b but was " + tm.getText());
-                     failed = true;
+                     failed("Should be b but was " + tm.getText());
                      latch.release();
                   }
                }
@@ -3969,8 +3976,7 @@
                {
                   if (!("b".equals(tm.getText())))
                   {
-                     log.info("Should be b but was " + tm.getText());
-                     failed = true;
+                     failed("Should be b but was " + tm.getText());
                      latch.release();
                   }
                }
@@ -3978,8 +3984,7 @@
                {
                   if (!("c".equals(tm.getText())))
                   {
-                     log.info("Should be c but was " + tm.getText());
-                     failed = true;
+                     failed("Should be c but was " + tm.getText());
                      latch.release();
                   }
                   latch.release();
@@ -3992,8 +3997,7 @@
                {
                   if (!("c".equals(tm.getText())))
                   {
-                     log.info("Should be c but was " + tm.getText());
-                     failed = true;
+                     failed("Should be c but was " + tm.getText());
                      latch.release();
                   }
                   latch.release();
@@ -4001,14 +4005,15 @@
                else
                {
                   //Shouldn't get a 4th messge
-                  failed = true;
+            	  failed("Shouldn't get a 4th message");
                   latch.release();
                }
             }
          }
          catch (JMSException e)
          {
-            failed = true;
+         	log.error(e.getMessage(), e);
+        	failed("Got a JMSException " + e.toString());
             latch.release();
          }
       }




More information about the jboss-cvs-commits mailing list