[jboss-cvs] JBoss Messaging SVN: r4560 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 23 11:56:26 EDT 2008


Author: timfox
Date: 2008-06-23 11:56:26 -0400 (Mon, 23 Jun 2008)
New Revision: 4560

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
   trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
Log:
Some changes to cleanup and close


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -50,6 +50,4 @@
    boolean isClosed();
 
    Version getServerVersion();
-      
-   void cleanUp();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -45,7 +45,5 @@
    
    boolean isClosed();   
    
-   boolean isDirect();
-
-   void cleanUp();
+   boolean isDirect();   
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -52,6 +52,4 @@
    int getMaxRate();
    
    int getInitialWindowSize();
-
-   void cleanUp();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -96,8 +96,6 @@
    
    boolean isXA();
 
-   void cleanUp();
-
    ClientMessage createClientMessage(final byte type, final boolean durable, final long expiration,
                             final long timestamp, final byte priority);
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -169,11 +169,21 @@
       }
    }
 
-   public synchronized void cleanUp()
+   public synchronized void cleanUp() throws Exception
    {
-      cleanUpChildren();
+      if (closed)
+      {
+         return;
+      }
       
-      closed = true;
+      try
+      {
+         cleanUpChildren();
+      }
+      finally
+      {            
+         closed = true;
+      }
    }
 
    public boolean isClosed()
@@ -208,11 +218,6 @@
       return serverVersion;
    }
    
-//   public ClientConnectionFactory getConnectionFactory()
-//   {
-//      return connectionFactory;
-//   }
-
    // Public ---------------------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -241,13 +246,13 @@
       }
    }
 
-   private void cleanUpChildren()
+   private void cleanUpChildren() throws Exception
    {
       //We copy the set of sessions to prevent ConcurrentModificationException which would occur
       //when the child trues to remove itself from its parent
-      Set<ClientSession> childrenClone = new HashSet<ClientSession>(sessions);
+      Set<ClientSessionInternal> childrenClone = new HashSet<ClientSessionInternal>(sessions);
 
-      for (ClientSession session: childrenClone)
+      for (ClientSessionInternal session: childrenClone)
       {
          session.cleanUp();
       }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -42,6 +42,8 @@
    void removeSession(ClientSessionInternal session);
    
    Set<ClientSession> getSessions();
+      
+   void cleanUp() throws Exception;
    
    //For testing only
    RemotingConnection getRemotingConnection();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -259,6 +259,8 @@
       }
    }
 
+   //TODO - should combine close() and cleanup() functionality in one method - there is currently duplication
+   
    public void close() throws MessagingException
    {
       if (closed)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -48,4 +48,6 @@
    int getBufferSize();
    
    int getCreditsToSend();
+   
+   void cleanUp() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -168,22 +168,19 @@
          return;         
       }
       
-      session.removeProducer(this);
-      
-      dispatcher.unregister(clientTargetID);
-      
-      closed = true;
+      doCleanup();
    }
 
    public void cleanUp()
    {
-      session.removeProducer(this);
-
-      dispatcher.unregister(clientTargetID);
-
-      closed = true;
+      if (closed)
+      {
+         return;         
+      }
+      
+      doCleanup();
    }
-
+         
    public boolean isClosed()
    {
       return closed;
@@ -229,6 +226,15 @@
 
    // Private --------------------------------------------------------------------------------------
    
+   private void doCleanup()
+   {
+      session.removeProducer(this);
+
+      dispatcher.unregister(clientTargetID);
+
+      closed = true;
+   }
+   
    private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
    {
       if (address != null)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -36,4 +36,6 @@
 	void receiveCredits(int credits) throws Exception;
 	
 	int getAvailableCredits();
+	
+   void cleanUp();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -21,25 +21,64 @@
  */ 
 package org.jboss.messaging.core.client.impl;
 
-import org.jboss.messaging.core.client.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.util.MessagingBuffer;
 import org.jboss.messaging.util.MessagingBufferFactory;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -478,11 +517,6 @@
       {
          closeChildren();
 
-         if (cacheProducers)
-         {
-            producerCache.clear();
-         }
-
          //Flush any acks to the server
          acknowledgeInternal(false);
 
@@ -490,11 +524,7 @@
       }
       finally
       {
-      	executorService.shutdown();
-
-         connection.removeSession(this);
-
-         closed = true;
+      	doCleanup();
       }
    }
 
@@ -516,17 +546,23 @@
       return new ClientMessageImpl(durable, body);
    }
 
-   public synchronized void cleanUp()
+   public synchronized void cleanUp() throws Exception
    {
-      cleanUpChildren();
-
-      executorService.shutdown();
-
-      connection.removeSession(this);
-
-      closed = true;
+      if (closed)
+      {
+         return;
+      }
+      
+      try
+      {
+         cleanUpChildren();
+      }
+      finally
+      {
+         doCleanup();
+      }
    }
-
+   
    public boolean isClosed()
    {
       return closed;
@@ -992,18 +1028,18 @@
       }
    }
 
-   private void cleanUpChildren()
+   private void cleanUpChildren() throws Exception
    {
-      Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers);
+      Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers);
 
-      for (ClientConsumer consumer: consumersClone)
+      for (ClientConsumerInternal consumer: consumersClone)
       {
          consumer.cleanUp();
       }
 
-      Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+      Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
 
-      for (ClientProducer producer: producersClone)
+      for (ClientProducerInternal producer: producersClone)
       {
          producer.cleanUp();
       }
@@ -1016,6 +1052,20 @@
       }
    }
    
+   private void doCleanup()
+   {
+      executorService.shutdown();
+      
+      connection.removeSession(this);
+      
+      if (cacheProducers)
+      {
+         producerCache.clear();
+      }
+
+      closed = true;
+   }
+   
    // Inner Classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -64,4 +64,6 @@
    Set<ClientBrowser> getBrowsers();
    
    Map<SimpleString, ClientProducerInternal> getProducerCache();
+   
+   void cleanUp() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -22,15 +22,35 @@
 
 package org.jboss.messaging.jms.client;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -392,7 +412,9 @@
       public void sessionDestroyed(long sessionID, MessagingException me)
       {
          if (me == null)
+         {
             return;
+         }
 
          if (exceptionListener != null)
          {
@@ -403,7 +425,16 @@
             exceptionListener.onException(je);
          }
 
-         connection.cleanUp();
+         try
+         {
+            //FIXME - this should not be called from the jms layer
+            //We need cleanup to also occur when core connections are dead, and the user may not be using the jms api
+            ((ClientConnectionInternal)connection).cleanUp();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to cleanup connection", e);            
+         }
       }
       
    }

Modified: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -35,7 +35,7 @@
 {
    public static MessagingBuffer createMessagingBuffer(TransportType transportType, int len)
    {
-      if(transportType == TransportType.TCP)
+      if (transportType == TransportType.TCP)
       {
          return new IoBufferWrapper(len);
       }
@@ -51,7 +51,7 @@
 
    public static MessagingBuffer createMessagingBuffer(MessagingBuffer buffer, int len)
    {
-      if(buffer instanceof IoBufferWrapper)
+      if (buffer instanceof IoBufferWrapper)
       {
          return new IoBufferWrapper(len);
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java	2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java	2008-06-23 15:56:26 UTC (rev 4560)
@@ -22,21 +22,41 @@
 
 package org.jboss.messaging.tests.unit.jms.client;
 
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+
 import junit.framework.TestCase;
+
 import org.easymock.EasyMock;
-import static org.easymock.EasyMock.*;
 import org.easymock.IArgumentMatcher;
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.version.Version;
 import org.jboss.messaging.jms.client.JBossConnection;
 import org.jboss.messaging.tests.util.RandomUtil;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
-import javax.jms.*;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -683,7 +703,7 @@
 
    public void testResourcesCleanedUp() throws Exception
    {
-      ClientConnection clientConn = createStrictMock(ClientConnection.class);
+      ClientConnectionInternal clientConn = createStrictMock(ClientConnectionInternal.class);
       FailureListenerMatcher failureListenerMatcher = new FailureListenerMatcher();
       EasyMock.reportMatcher(failureListenerMatcher);
       clientConn.setRemotingSessionListener(null);




More information about the jboss-cvs-commits mailing list