[jboss-cvs] JBoss Messaging SVN: r4567 - in trunk: src/main/org/jboss/messaging/core/message/impl and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 24 04:50:05 EDT 2008


Author: ataylor
Date: 2008-06-24 04:50:05 -0400 (Tue, 24 Jun 2008)
New Revision: 4567

Added:
   trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
Removed:
   trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
   trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
Log:
added second failure listener to handle cleanup at the core level not JMS. ALso made MesagingBufferFactory an interface

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -21,9 +21,6 @@
  */ 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.jboss.messaging.core.client.ClientConnectionFactory;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -37,6 +34,9 @@
 import org.jboss.messaging.core.version.Version;
 import org.jboss.messaging.util.ConcurrentHashSet;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * The client-side Connection connectionFactory class.
  *
@@ -89,6 +89,8 @@
       this.serverTargetID = serverTargetID;
       
       this.remotingConnection = connection;
+
+      this.remotingConnection.addRemotingSessionListener(new JBMFailureListener());
       
       this.serverVersion = serverVersion;
       
@@ -144,7 +146,7 @@
    {
       checkClosed();
       
-      remotingConnection.setRemotingSessionListener(listener);
+      remotingConnection.addRemotingSessionListener(listener);
    }
    
    public synchronized void close() throws MessagingException
@@ -258,6 +260,21 @@
       }
    }
 
+   private class JBMFailureListener implements RemotingSessionListener
+   {
+      public void sessionDestroyed(long sessionID, MessagingException me)
+      {
+         try
+         {
+            cleanUp();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to cleanup connection", e);
+         }
+      }
+
+   }
    // Inner Classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,10 +22,6 @@
 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -37,6 +33,10 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -258,80 +258,21 @@
          }
       }
    }
-
-   //TODO - should combine close() and cleanup() functionality in one method - there is currently duplication
    
    public void close() throws MessagingException
    {
-      if (closed)
-      {
-         return;
-      }
-      
-      try
-      {
-         // Now we wait for any current handler runners to run.
-         waitForOnMessageToComplete();
-
-         closed = true;
-                           
-         if (receiverThread != null)
-         {
-            synchronized (this)
-            {   
-               // Wake up any receive() thread that might be waiting
-               notify();
-            }
-         }
-                          
-         handler = null;
-         
-         receiverThread = null;
-
-         remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
-
-         dispatcher.unregister(clientTargetID);
-      }
-      finally
-      {
-         session.removeConsumer(this);
-      }   	
+      doCleanUp(true);
    }
 
    public synchronized void cleanUp()
    {
-     try
+      try
       {
-         // Now we wait for any current handler runners to run.
-         waitForOnMessageToComplete();
-
-         closed = true;
-
-         if (receiverThread != null)
-         {
-            synchronized (this)
-            {
-               // Wake up any receive() thread that might be waiting
-               notify();
-            }
-         }
-
-         handler = null;
-
-         receiverThread = null;
-
-         dispatcher.unregister(clientTargetID);
+         doCleanUp(false);
       }
-      finally
+      catch (MessagingException e)
       {
-         try
-         {
-            session.removeConsumer(this);
-         }
-         catch (MessagingException e)
-         {
-            log.warn("Unable to clean up consumer:" + this);
-         }
+         log.warn("problem cleaning up: " + this);
       }
    }
 
@@ -568,7 +509,47 @@
 			log.error("RuntimeException thrown from handler", e);
 		}
    }
-   
+
+   public void doCleanUp(boolean sendCloseMessage) throws MessagingException
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      try
+      {
+         // Now we wait for any current handler runners to run.
+         waitForOnMessageToComplete();
+
+         closed = true;
+
+         if (receiverThread != null)
+         {
+            synchronized (this)
+            {
+               // Wake up any receive() thread that might be waiting
+               notify();
+            }
+         }
+
+         handler = null;
+
+         receiverThread = null;
+
+         if(sendCloseMessage)
+         {
+            remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
+         }
+
+         dispatcher.unregister(clientTargetID);
+      }
+      finally
+      {
+         session.removeConsumer(this);
+      }
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------------
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -21,64 +21,22 @@
  */ 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.*;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.MessagingBufferFactory;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.util.*;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -148,6 +106,8 @@
 
    private final boolean blockOnAcknowledge;
 
+   private MessagingBufferFactory messagingBufferFactory;
+
    //For testing only
    private boolean forceNotSameRM;
 
@@ -200,6 +160,8 @@
       this.autoCommitSends = autoCommitSends;
 
       this.blockOnAcknowledge = blockOnAcknowledge;
+
+      messagingBufferFactory = new MessagingBufferFactoryImpl();
    }
 
    // ClientSession implementation -----------------------------------------------------------------
@@ -530,19 +492,19 @@
 
    public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
    {
-      MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+      MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
       return new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
    }
 
    public ClientMessage createClientMessage(byte type, boolean durable)
    {
-      MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+      MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
       return new ClientMessageImpl(type, durable, body);
    }
 
    public ClientMessage createClientMessage(boolean durable)
    {
-      MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+      MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
       return new ClientMessageImpl(durable, body);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -26,7 +26,6 @@
 import org.jboss.messaging.core.message.Message;
 import static org.jboss.messaging.util.DataConstants.*;
 import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.MessagingBufferFactory;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
 
@@ -160,7 +159,7 @@
       //TODO - this can be optimised
       byte[] bytes = new byte[len];
       buffer.getBytes(bytes);
-      body = MessagingBufferFactory.createMessagingBuffer(buffer, len);
+      body = buffer.createNewBuffer(len);
       body.putBytes(bytes);      
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -45,7 +45,7 @@
    
    void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
    
-   void setRemotingSessionListener(RemotingSessionListener newListener);
+   void addRemotingSessionListener(RemotingSessionListener newListener);
    
    PacketDispatcher getPacketDispatcher();
    

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl;
 
@@ -30,6 +30,9 @@
 import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -55,17 +58,17 @@
 
    private RemotingSession session;
 
-   private RemotingSessionListener listener;
+   private List<RemotingSessionListener> sessionListeners = new ArrayList<RemotingSessionListener>();
 
    // Constructors ---------------------------------------------------------------------------------
 
    public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws IllegalArgumentException
    {
-      if(location == null)
+      if (location == null)
       {
          throw new IllegalArgumentException("location must not be null");
       }
-      if(connectionParams == null)
+      if (connectionParams == null)
       {
          throw new IllegalArgumentException("connection params must not be null");
       }
@@ -88,9 +91,9 @@
       }
 
       connector = ConnectorRegistryFactory.getRegistry().getConnector(location, connectionParams);
-      
+
       session = connector.connect();
-      
+
       if (log.isDebugEnabled())
          log.debug("Using " + connector + " to connect to " + location);
 
@@ -106,13 +109,13 @@
       {
          if (connector != null)
          {
-            if (listener != null)
+            for (RemotingSessionListener sessionListener : sessionListeners)
             {
-               connector.removeSessionListener(listener);
+               connector.removeSessionListener(sessionListener);
             }
-            
+
             RemotingConnector connectorFromRegistry = ConnectorRegistryFactory.getRegistry().removeConnector(location);
-            
+
             if (connectorFromRegistry != null)
             {
                connectorFromRegistry.disconnect();
@@ -210,24 +213,32 @@
       }
    }
 
-   public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
+   public synchronized void addRemotingSessionListener(final RemotingSessionListener newListener)
    {
-      if (listener != null && newListener != null)
+      if (newListener == null)
       {
-         throw new IllegalStateException("FailureListener already set to " + listener);
+         throw new IllegalStateException("FailureListener cannot be null");
       }
 
+      if (sessionListeners.contains(newListener))
+      {
+         throw new IllegalStateException("FailureListener already set");
+      }
+
       if (newListener != null)
       {
+         sessionListeners.add(newListener);
          connector.addSessionListener(newListener);
       }
-      else
-      {
-         connector.removeSessionListener(listener);
-      }
-      this.listener = newListener;
    }
 
+   public synchronized void removeRemotingSessionListener(final RemotingSessionListener listener)
+   {
+      sessionListeners.remove(listener);
+      connector.removeSessionListener(listener);
+   }
+
+
    public PacketDispatcher getPacketDispatcher()
    {
       return connector.getDispatcher();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,18 +22,14 @@
 
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.charset.Charset;
-
 import org.apache.mina.common.IoBuffer;
 import org.jboss.messaging.core.logging.Logger;
+import static org.jboss.messaging.util.DataConstants.*;
 import org.jboss.messaging.util.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
 
+import java.nio.charset.Charset;
+
 /**
  * 
  * A BufferWrapper
@@ -113,7 +109,12 @@
    {
       return new IoBufferWrapper(buffer.slice());
    }
-   
+
+   public MessagingBuffer createNewBuffer(int len)
+   {
+      return new IoBufferWrapper(len);
+   }
+
    public int remaining()
    {
       return buffer.remaining();

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,35 +22,15 @@
 
 package org.jboss.messaging.jms.client;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.XAConnection;
-import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueSession;
-import javax.jms.XASession;
-import javax.jms.XATopicConnection;
-import javax.jms.XATopicSession;
-
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -424,17 +404,6 @@
 
             exceptionListener.onException(je);
          }
-
-         try
-         {
-            //FIXME - this should not be called from the jms layer
-            //We need cleanup to also occur when core connections are dead, and the user may not be using the jms api
-            ((ClientConnectionInternal)connection).cleanUp();
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to cleanup connection", e);            
-         }
       }
       
    }

Modified: trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -90,8 +90,13 @@
    {
    	return new ByteBufferWrapper(buffer.slice());
    }
-	
-	public void rewind()
+
+   public MessagingBuffer createNewBuffer(int len)
+   {
+      return new ByteBufferWrapper(ByteBuffer.allocate(len));
+   }
+
+   public void rewind()
 	{
 		buffer.rewind();
 	}

Modified: trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -115,5 +115,7 @@
 	
 	void rewind();
 	
-	MessagingBuffer slice();	
+	MessagingBuffer slice();
+
+   MessagingBuffer createNewBuffer(int len);
 }

Deleted: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
-
-import java.nio.ByteBuffer;
-
-/**
- * a factory class for creating an appropriate type of MessagingBuffer.
- * 
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class MessagingBufferFactory
-{
-   public static MessagingBuffer createMessagingBuffer(TransportType transportType, int len)
-   {
-      if (transportType == TransportType.TCP)
-      {
-         return new IoBufferWrapper(len);
-      }
-      else if(transportType == TransportType.INVM)
-      {
-         return new ByteBufferWrapper(ByteBuffer.allocate(len));
-      }
-      else
-      {
-         throw new IllegalArgumentException("No Messaging Buffer for transport");
-      }
-   }
-
-   public static MessagingBuffer createMessagingBuffer(MessagingBuffer buffer, int len)
-   {
-      if (buffer instanceof IoBufferWrapper)
-      {
-         return new IoBufferWrapper(len);
-      }
-      else if(buffer instanceof ByteBufferWrapper)
-      {
-         return new ByteBufferWrapper(ByteBuffer.allocate(len));
-      }
-      else
-      {
-         throw new IllegalArgumentException("No Messaging Buffer for transport");
-      }
-   }
-}

Added: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -0,0 +1,11 @@
+package org.jboss.messaging.util;
+
+import org.jboss.messaging.core.remoting.TransportType;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface MessagingBufferFactory
+{
+   MessagingBuffer createMessagingBuffer(TransportType transportType, int len);
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -23,10 +23,7 @@
 package org.jboss.messaging.tests.unit.core.client.impl;
 
 import org.easymock.EasyMock;
-import org.jboss.messaging.core.client.ClientConnection;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ConnectionParams;
-import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.*;
 import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientConnectionImpl;
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
@@ -220,7 +217,6 @@
       RemotingConnectionFactory rcf = EasyMock.createStrictMock(RemotingConnectionFactory.class);
       
       RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-      
       PacketDispatcher dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
       
       ClientConnectionFactory cf =
@@ -232,7 +228,7 @@
       EasyMock.expect(rcf.createRemotingConnection(location, params)).andReturn(rc);
       
       rc.start();
-      
+
       Version clientVersion = VersionLoader.load();
       
       CreateConnectionRequest request =
@@ -246,9 +242,9 @@
          new CreateConnectionResponse(connTargetID, serverVersion);
       
       EasyMock.expect(rc.sendBlocking(0, 0, request)).andReturn(response);
-      
+
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(dispatcher);
-      
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       EasyMock.replay(rcf, rc, dispatcher);
       
       ClientConnection conn;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -18,12 +18,16 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.tests.unit.core.client.impl;
 
 import org.easymock.EasyMock;
-import org.jboss.messaging.core.client.*;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.client.impl.*;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -40,11 +44,9 @@
 import java.util.Set;
 
 /**
- * 
  * A ClientConnectionImplTest
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public class ClientConnectionImplTest extends UnitTestCase
 {
@@ -52,22 +54,22 @@
 
    public void testConstructor() throws Exception
    {
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
 
       Version version = new VersionImpl("blah132", 1, 1, 1, 12, "blah1652");
 
       Location location = new LocationImpl(TransportType.TCP, "sausages");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-      
+
       final long serverTargetID = 12091092;
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       EasyMock.replay(rc, pd);
-      
+
       ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-      
+
       EasyMock.verify(rc, pd);
 
       assertTrue(conn.getServerVersion() == version);
@@ -81,11 +83,11 @@
       testCreateSession(false, false, false, 14526512, false, false, false);
       testCreateSession(true, true, true, 14526512, true, true, false);
    }
-   
+
    public void testStartStop() throws Exception
    {
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
-      
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
 
       Location location = new LocationImpl(TransportType.TCP, "ftftf");
@@ -93,28 +95,28 @@
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
 
       final int serverTargetID = 23;
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
-      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-      
+
+
+
       rc.sendOneWay(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CONN_START));
-      
+
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CONN_STOP))).andReturn(null);
-      
+
       EasyMock.replay(rc, pd);
-      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
       conn.start();
-      
+
       conn.stop();
-      
-      EasyMock.verify(rc, pd);            
+
+      EasyMock.verify(rc, pd);
    }
-   
+
    public void testSetRemotingSessionListener() throws Exception
    {
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
-      
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
 
       Location location = new LocationImpl(TransportType.TCP, "ftftf");
@@ -122,84 +124,84 @@
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
 
       final int serverTargetID = 23;
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
-      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-      
+
+
+
       RemotingSessionListener listener = new RemotingSessionListener()
       {
          public void sessionDestroyed(long sessionID, MessagingException me)
-         {            
+         {
          }
       };
-      
-      rc.setRemotingSessionListener(listener);
-            
+
+      rc.addRemotingSessionListener(listener);
+
       EasyMock.replay(rc, pd);
-      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
       conn.setRemotingSessionListener(listener);
-      
-      EasyMock.verify(rc, pd);            
+
+      EasyMock.verify(rc, pd);
    }
-   
+
    public void testClose() throws Exception
    {
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
-      
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
 
       Location location = new LocationImpl(TransportType.TCP, "ftftf");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-      
+
       final int serverTargetID = 23;
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
+      EasyMock.replay(rc);
       ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-      
+      EasyMock.reset(rc);
       assertFalse(conn.isClosed());
-    
+
       //Create some sessions
-      
+
       ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       conn.addSession(sess1);
       conn.addSession(sess2);
       conn.addSession(sess3);
-      
+
       sess1.close();
       sess2.close();
       sess3.close();
-      
+
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
-            
-      rc.stop();      
-      
+
+      rc.stop();
+
       EasyMock.replay(rc, pd, sess1, sess2, sess3);
-      
+
       conn.close();
-      
+
       EasyMock.verify(rc, pd, sess1, sess2, sess3);
-      
+
       assertTrue(conn.isClosed());
-      
+
       assertSame(conn.getRemotingConnection(), rc);
-      
+
       //Close again should do nothing
       EasyMock.reset(rc, pd, sess1, sess2, sess3);
-      
+
       EasyMock.replay(rc, pd, sess1, sess2, sess3);
-      
+
       conn.close();
-      
+
       EasyMock.verify(rc, pd, sess1, sess2, sess3);
-      
+
       try
       {
          conn.createClientSession(false, false, false, 65655);
@@ -209,7 +211,7 @@
       {
          assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
       }
-      
+
       try
       {
          conn.createClientSession(false, false, false, 545, false, false);
@@ -219,7 +221,7 @@
       {
          assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
       }
-      
+
       try
       {
          conn.start();
@@ -229,7 +231,7 @@
       {
          assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
       }
-      
+
       try
       {
          conn.stop();
@@ -239,13 +241,13 @@
       {
          assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
       }
-      
+
       try
       {
          conn.setRemotingSessionListener(new RemotingSessionListener()
          {
             public void sessionDestroyed(long sessionID, MessagingException me)
-            {            
+            {
             }
          });
          fail("Should throw exception");
@@ -255,64 +257,64 @@
          assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
       }
    }
-   
+
    public void testRemoveSession() throws Exception
    {
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
-      
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
       Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
 
       Location location = new LocationImpl(TransportType.TCP, "ftftf");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-      
+
       final int serverTargetID = 23;
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
+
       ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-      
+
       //Create some sessions
-      
+
       ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
-      
+
       conn.addSession(sess1);
       conn.addSession(sess2);
       conn.addSession(sess3);
-            
+
       Set<ClientSession> sessions = conn.getSessions();
       assertEquals(3, sessions.size());
       assertTrue(sessions.contains(sess1));
       assertTrue(sessions.contains(sess2));
       assertTrue(sessions.contains(sess3));
-      
+
       conn.removeSession(sess2);
-      
+
       sessions = conn.getSessions();
       assertEquals(2, sessions.size());
-      assertTrue(sessions.contains(sess1));     
+      assertTrue(sessions.contains(sess1));
       assertTrue(sessions.contains(sess3));
-      
+
       conn.removeSession(sess1);
-      
+
       sessions = conn.getSessions();
-      assertEquals(1, sessions.size());   
+      assertEquals(1, sessions.size());
       assertTrue(sessions.contains(sess3));
-      
+
       conn.removeSession(sess3);
-      
+
       sessions = conn.getSessions();
-      assertEquals(0, sessions.size());   
+      assertEquals(0, sessions.size());
    }
-   
+
    public void testSessionCleanedUp() throws Exception
    {
       RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Location location = new LocationImpl(TransportType.TCP, "oranges");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
@@ -322,9 +324,9 @@
       Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
 
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
-      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
 
+
+
       ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(true, true, true);
 
       final int sessionTargetID = 12127162;
@@ -334,7 +336,7 @@
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
 
       EasyMock.replay(rc, pd);
-      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
       ClientSession session = conn.createClientSession(true, true, true, 1);
       conn.cleanUp();
       assertTrue(session.isClosed());
@@ -346,7 +348,7 @@
    public void testSessionsCleanedUp() throws Exception
    {
       RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Location location = new LocationImpl(TransportType.TCP, "oranges");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
@@ -356,9 +358,9 @@
       Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
 
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
-      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
 
+
+
       ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(true, true, true);
 
       final int sessionTargetID = 12127162;
@@ -368,6 +370,7 @@
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response).anyTimes();
 
       EasyMock.replay(rc, pd);
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
       ClientSession session1 = conn.createClientSession(true, true, true, 1);
       ClientSession session2 = conn.createClientSession(true, true, true, 2);
       ClientSession session3 = conn.createClientSession(true, true, true, 3);
@@ -379,23 +382,23 @@
       EasyMock.verify(rc, pd);
 
    }
-               
+
    // Private -----------------------------------------------------------------------------------------------------------
 
 
    private void testCreateSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
-         final int ackBatchSize, final boolean blockOnAcknowledge,
-         final boolean cacheProducers, final boolean useDefaults) throws Exception
-   {       
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class); 
-
+                                  final int ackBatchSize, final boolean blockOnAcknowledge,
+                                  final boolean cacheProducers, final boolean useDefaults) throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
       Location location = new LocationImpl(TransportType.TCP, "oranges");
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
 
       if (useDefaults)
       {
-         cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge); 
+         cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge);
       }
       else
       {
@@ -405,11 +408,11 @@
       final int serverTargetID = 17267162;
 
       Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
-      
+
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-      
-      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
 
+
+
       ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(xa, autoCommitSends, autoCommitAcks);
 
       final int sessionTargetID = 12127162;
@@ -419,7 +422,7 @@
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
 
       EasyMock.replay(rc, pd);
-
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
       ClientSession session;
 
       if (useDefaults)
@@ -429,7 +432,7 @@
       else
       {
          session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge,
-               cacheProducers);
+                 cacheProducers);
       }
 
       assertEquals(ackBatchSize, session.getLazyAckBatchSize());
@@ -440,4 +443,42 @@
 
       EasyMock.verify(rc, pd);
    }
+
+
+   public void testResourcesCleanedUp() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      Location location = new LocationImpl(TransportType.TCP, "oranges");
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+      Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+      final int serverTargetID = 17267162;
+      rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
+      SetRemotingSessionListenerAnswer answer = new SetRemotingSessionListenerAnswer();
+      EasyMock.expectLastCall().andAnswer(answer);
+
+
+
+      final int sessionTargetID = 12127162;
+      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(sessionTargetID);
+      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, true, true);
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+      EasyMock.replay(rc, pd);
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
+      assertNotNull(answer.listener);
+      ClientSession session = conn.createClientSession(false, true, true, 1);
+      answer.listener.sessionDestroyed(serverTargetID, new MessagingException());
+      assertTrue(session.isClosed());
+   }
+
+   class SetRemotingSessionListenerAnswer implements IAnswer
+   {
+      RemotingSessionListener listener = null;
+      public Object answer() throws Throwable
+      {
+         listener = (RemotingSessionListener) EasyMock.getCurrentArguments()[0];
+         return null;
+      }
+   }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -156,7 +156,7 @@
 
       RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
       remotingConnection.start();
-      remotingConnection.setRemotingSessionListener(listener);
+      remotingConnection.addRemotingSessionListener(listener);
       remotingConnection.stop();
       EasyMock.verify(connector);
       EasyMock.verify(connectorRegistry);
@@ -729,7 +729,7 @@
 
       RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
       remotingConnection.start();
-      remotingConnection.setRemotingSessionListener(listener);
+      remotingConnection.addRemotingSessionListener(listener);
       EasyMock.verify(connector);
       EasyMock.verify(connectorRegistry);
    }
@@ -760,52 +760,14 @@
 
       RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
       remotingConnection.start();
-      remotingConnection.setRemotingSessionListener(listener);
-      remotingConnection.setRemotingSessionListener(null);
-      remotingConnection.setRemotingSessionListener(listener2);
+      remotingConnection.addRemotingSessionListener(listener);
+      remotingConnection.removeRemotingSessionListener(listener);
+      remotingConnection.addRemotingSessionListener(listener2);
       EasyMock.verify(connector);
       EasyMock.verify(connectorRegistry);
    }
 
-   public void testConnectionSetListenerTwiceThrowsException() throws Throwable
-   {
-      final ConnectorRegistry connectorRegistry = EasyMock.createStrictMock(ConnectorRegistry.class);
-      RemotingConnector connector = EasyMock.createStrictMock(RemotingConnector.class);
-      ConnectorRegistryFactory.setRegisteryLocator(new ConnectorRegistryLocator()
-      {
-         public ConnectorRegistry locate()
-         {
-            return connectorRegistry;
-         }
-      });
-      Location location = EasyMock.createNiceMock(Location.class);
-      ConnectionParams connectionParams = EasyMock.createNiceMock(ConnectionParams.class);
-      RemotingSession remotingSession = EasyMock.createStrictMock(RemotingSession.class);
-      RemotingSessionListener listener = EasyMock.createNiceMock(RemotingSessionListener.class);
-      RemotingSessionListener listener2 = EasyMock.createNiceMock(RemotingSessionListener.class);
 
-      EasyMock.expect(connectorRegistry.getConnector(location, connectionParams)).andReturn(connector);
-      EasyMock.replay(connectorRegistry);
-      EasyMock.expect(connector.connect()).andReturn(remotingSession);
-      connector.addSessionListener(listener);
-      EasyMock.replay(connector);
-
-      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
-      remotingConnection.start();
-      remotingConnection.setRemotingSessionListener(listener);
-      try
-      {
-         remotingConnection.setRemotingSessionListener(listener2);
-         fail("should throw exception");
-      }
-      catch (IllegalStateException e)
-      {
-         //pass
-      }
-      EasyMock.verify(connector);
-      EasyMock.verify(connectorRegistry);
-   }
-
    public void testGetDispatcher() throws Throwable
    {
       final ConnectorRegistry connectorRegistry = EasyMock.createStrictMock(ConnectorRegistry.class);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,17 +22,7 @@
 
 package org.jboss.messaging.tests.unit.core.remoting.network;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
-import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_INTERVAL;
-import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_TIMEOUT;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
 import junit.framework.TestCase;
-
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientConnectionFactory;
 import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -43,11 +33,19 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Acceptor;
 import org.jboss.messaging.core.remoting.TransportType;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
 import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_INTERVAL;
+import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_TIMEOUT;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt>
@@ -137,14 +135,7 @@
       //Thread.sleep((PING_INTERVAL + PING_TIMEOUT) * 1000);
       assertActiveConnectionsOnTheServer(0);
 
-      try
-      {
-         conn.close();
-         fail("close should fail since client resources must have been cleaned up on the server side");
-      }
-      catch (Exception e)
-      {
-      }
+      assertTrue(conn.isClosed());
 
       minaService.removeRemotingSessionListener(listener);
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,16 +22,12 @@
 
 package org.jboss.messaging.tests.unit.core.util;
 
-import java.nio.ByteBuffer;
-
+import junit.framework.TestCase;
 import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
-import org.jboss.messaging.util.ByteBufferWrapper;
 import org.jboss.messaging.util.MessagingBuffer;
 import org.jboss.messaging.util.MessagingBufferFactory;
+import org.jboss.messaging.util.MessagingBufferFactoryImpl;
 
-import junit.framework.TestCase;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  *
@@ -40,6 +36,7 @@
  */
 public class MessagingBufferFactoryTest extends TestCase
 {
+   private MessagingBufferFactory messagingBufferFactory;
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -50,10 +47,22 @@
 
    // Public --------------------------------------------------------
 
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      messagingBufferFactory = new MessagingBufferFactoryImpl();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      messagingBufferFactory = null;
+   }
+
    public void testCreateMessagingBufferForTCP() throws Exception
    {
       int length = 512;
-      MessagingBuffer buffer = MessagingBufferFactory.createMessagingBuffer(TransportType.TCP, length);
+      MessagingBuffer buffer = messagingBufferFactory.createMessagingBuffer(TransportType.TCP, length);
       assertNotNull(buffer);
       assertEquals(length, buffer.capacity());
    }
@@ -62,30 +71,11 @@
    public void testCreateMessagingBufferForINVM() throws Exception
    {
       int length = 512;
-      MessagingBuffer buffer = MessagingBufferFactory.createMessagingBuffer(TransportType.INVM, length);
+      MessagingBuffer buffer = messagingBufferFactory.createMessagingBuffer(TransportType.INVM, length);
       assertNotNull(buffer);
       assertEquals(length, buffer.capacity());
    }
    
-   public void testCreateMessagingBufferFromByteBufferWrapper() throws Exception
-   {
-      int length = 512;
-      MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.allocate(length));
-      
-      MessagingBuffer buff = MessagingBufferFactory.createMessagingBuffer(buffer, length);
-      assertNotNull(buff);
-      assertTrue(buff instanceof ByteBufferWrapper);
-   }
-
-   public void testCreateMessagingBufferFromIoBufferWrapper() throws Exception
-   {
-      int length = 512;
-      MessagingBuffer buffer = new IoBufferWrapper(length);
-      
-      MessagingBuffer buff = MessagingBufferFactory.createMessagingBuffer(buffer, length);
-      assertNotNull(buff);
-      assertTrue(buff instanceof IoBufferWrapper);
-   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java	2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java	2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,41 +22,21 @@
 
 package org.jboss.messaging.tests.unit.jms.client;
 
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.XAQueueSession;
-import javax.jms.XASession;
-import javax.jms.XATopicSession;
-
 import junit.framework.TestCase;
-
 import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
 import org.easymock.IArgumentMatcher;
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.version.Version;
 import org.jboss.messaging.jms.client.JBossConnection;
 import org.jboss.messaging.tests.util.RandomUtil;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
+import javax.jms.*;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -701,21 +681,6 @@
       verify(clientConn, clientSession);
    }
 
-   public void testResourcesCleanedUp() throws Exception
-   {
-      ClientConnectionInternal clientConn = createStrictMock(ClientConnectionInternal.class);
-      FailureListenerMatcher failureListenerMatcher = new FailureListenerMatcher();
-      EasyMock.reportMatcher(failureListenerMatcher);
-      clientConn.setRemotingSessionListener(null);
-      clientConn.cleanUp();
-      replay(clientConn);
-
-      JBossConnection connection = new JBossConnection(clientConn,
-            JBossConnection.TYPE_QUEUE_CONNECTION, null, -1);
-      failureListenerMatcher.listener.sessionDestroyed(0, new MessagingException());
-
-      verify(clientConn);
-   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list