[jboss-cvs] JBoss Messaging SVN: r4441 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 12 05:42:54 EDT 2008


Author: timfox
Date: 2008-06-12 05:42:54 -0400 (Thu, 12 Jun 2008)
New Revision: 4441

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCancelMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More test stuff


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -28,4 +28,12 @@
    void close() throws MessagingException;
    
    boolean isClosed();   
+   
+   boolean isBlockOnPersistentSend();
+   
+   boolean isBlockOnNonPersistentSend();
+   
+   int getMaxRate();
+   
+   int getInitialWindowSize();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -49,6 +49,9 @@
    
    ClientProducer createProducer(SimpleString address) throws MessagingException;
    
+   ClientProducer createProducer(SimpleString address, int windowSize, int maxRate,
+                                 boolean blockOnNonPersistentSend, boolean blockOnPersistentSend) throws MessagingException;
+   
    ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException;
    
    ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -62,7 +62,7 @@
    
    private final RemotingConnection remotingConnection;
 
-   private final Set<ClientSession> sessions = new ConcurrentHashSet<ClientSession>();
+   private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
 
    private final Version serverVersion;
    
@@ -101,11 +101,11 @@
       ConnectionCreateSessionResponseMessage response =
          (ConnectionCreateSessionResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);   
 
-      ClientSession session =
+      ClientSessionInternal session =
       	new ClientSessionImpl(this, response.getSessionID(), xa, ackBatchSize, cacheProducers,
       			                autoCommitSends, autoCommitAcks, blockOnAcknowledge);
 
-      sessions.add(session);
+      addSession(session);
 
       return session;
    }
@@ -173,8 +173,13 @@
       return remotingConnection;
    }
    
-   public void removeSession(final ClientSession session)
+   public void addSession(final ClientSessionInternal session)
    {
+      sessions.add(session);
+   }
+   
+   public void removeSession(final ClientSessionInternal session)
+   {
       sessions.remove(session);
    }
    
@@ -217,6 +222,7 @@
        
       for (ClientSession session: childrenClone)
       {
+         log.info("closing session");
          session.close(); 
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -9,7 +9,6 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.client.ClientConnection;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 
@@ -24,7 +23,9 @@
 {
    RemotingConnection getRemotingConnection();
 
-   void removeSession(ClientSession session);
+   void addSession(ClientSessionInternal session);
    
+   void removeSession(ClientSessionInternal session);
+   
    Set<ClientSession> getSessions();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -367,6 +367,8 @@
 
    public void recover(final long lastDeliveryID)
    {
+      log.info("Calling recover with " + lastDeliveryID);
+      
       ignoreDeliveryMark = lastDeliveryID;
 
       buffer.clear();      
@@ -376,6 +378,16 @@
    {
       return clientWindowSize;
    }
+   
+   public long getIgnoreDeliveryMark()
+   {
+      return ignoreDeliveryMark;
+   }
+   
+   public int getBufferSize()
+   {
+      return buffer.size();
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -26,4 +26,8 @@
    void recover(long lastDeliveryID) throws MessagingException;
    
    int getClientWindowSize();
+   
+   long getIgnoreDeliveryMark();
+   
+   int getBufferSize();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -72,12 +72,16 @@
      
    private final TokenBucketLimiter rateLimiter;
    
-   private final boolean sendNonPersistentMessagesSynchronously;
+   private final boolean blockOnNonPersistentSend;
    
-   private final boolean sendPersistentMessagesSynchronously;
+   private final boolean blockOnPersistentSend;
    
    private final boolean creditFlowControl;
-     
+   
+   private final int initialWindowSize;
+   
+   private final int maxRate;
+   
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
@@ -87,8 +91,8 @@
    		                    final SimpleString address,
    		                    final RemotingConnection remotingConnection,
    		                    final int maxRate,
-   		                    final boolean sendNonPersistentMessagesSynchronously,
-   		                    final boolean sendPersistentMessagesSynchronously,
+   		                    final boolean blockOnNonPersistentSend,
+   		                    final boolean blockOnPersistentSend,
    		                    final int initialCredits)
    {   	
       this.session = session;
@@ -110,13 +114,17 @@
       	this.rateLimiter = null;
       }
       
-      this.sendNonPersistentMessagesSynchronously = sendNonPersistentMessagesSynchronously; 
+      this.blockOnNonPersistentSend = blockOnNonPersistentSend; 
       
-      this.sendPersistentMessagesSynchronously = sendPersistentMessagesSynchronously;
+      this.blockOnPersistentSend = blockOnPersistentSend;
       
       this.availableCredits = new Semaphore(initialCredits);
       
       this.creditFlowControl = initialCredits != -1;
+      
+      this.initialWindowSize = initialCredits;
+      
+      this.maxRate = maxRate;
    }
    
    // ClientProducer implementation ----------------------------------------------------------------
@@ -158,7 +166,7 @@
          rateLimiter.limit();
       }
    	
-   	boolean sendBlocking = msg.isDurable() ? sendPersistentMessagesSynchronously : sendNonPersistentMessagesSynchronously;
+   	boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
    	
       ProducerSendMessage message = new ProducerSendMessage(msg);
          		
@@ -213,6 +221,26 @@
       return closed;
    }
    
+   public boolean isBlockOnPersistentSend()
+   {
+      return blockOnPersistentSend;
+   }
+   
+   public boolean isBlockOnNonPersistentSend()
+   {
+      return blockOnNonPersistentSend;
+   }
+   
+   public int getInitialWindowSize()
+   {
+      return initialWindowSize;
+   }
+   
+   public int getMaxRate()
+   {
+      return maxRate;
+   }
+   
    // ClientProducerInternal implementation --------------------------------------------------------
    
    public void receiveCredits(final int credits)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -62,4 +62,16 @@
    {
       return "ClientProducerPacketHandler[id=" + producerID + "]";
    }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof ClientProducerPacketHandler == false)
+      {
+         return false;
+      }
+            
+      ClientProducerPacketHandler r = (ClientProducerPacketHandler)other;
+      
+      return r.producerID == this.producerID;     
+   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -109,9 +109,9 @@
    
    private final Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
    
-   private final Set<ClientProducer> producers = new HashSet<ClientProducer>();
+   private final Set<ClientProducerInternal> producers = new HashSet<ClientProducerInternal>();
    
-   private final Map<Long, ClientConsumerInternal> consumers = new HashMap<Long, ClientConsumerInternal>();
+   private final Set<ClientConsumerInternal> consumers = new HashSet<ClientConsumerInternal>();
    
    private final Map<SimpleString, ClientProducerInternal> producerCache;
    
@@ -300,7 +300,7 @@
       ClientConsumerInternal consumer =
          new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, clientWindowSize, direct);
 
-      consumers.put(response.getConsumerTargetID(), consumer);
+      addConsumer(consumer);
       
       remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, clientTargetID));
       
@@ -328,7 +328,7 @@
 
       ClientBrowser browser = new ClientBrowserImpl(response.getBrowserTargetID(), this, remotingConnection);  
 
-      browsers.add(browser);
+      addBrowser(browser);
 
       return browser;
    }
@@ -339,53 +339,65 @@
                             connection.getConnectionFactory().getDefaultProducerMaxRate());
    }
       
-   public ClientProducer createProducer(final SimpleString address, final int windowSize, final int maxRate) throws MessagingException
+   public ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException
    {
+   	return createProducer(address, -1, rate);
+   }
+   
+   public ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException
+   {
+   	return createProducer(address, windowSize, -1);
+   }
+   
+   private ClientProducer createProducer(final SimpleString address, final int windowSize, final int maxRate) throws MessagingException
+   {
+      return createProducer(address, windowSize, maxRate,
+                            connection.getConnectionFactory().isDefaultBlockOnNonPersistentSend(),
+                            connection.getConnectionFactory().isDefaultBlockOnPersistentSend());
+   }
+   
+   public ClientProducer createProducer(final SimpleString address, final int windowSize, final int maxRate,
+                                        final boolean blockOnNonPersistentSend,
+                                        final boolean blockOnPersistentSend) throws MessagingException
+   {
       checkClosed();
-      
+
       ClientProducerInternal producer = null;
-      
+
       if (cacheProducers)
       {
-      	producer = producerCache.remove(address);
+         producer = producerCache.remove(address);
       }
 
       if (producer == null)
       {
          long clientTargetID = remotingConnection.getPacketDispatcher().generateID();
+
+         SessionCreateProducerMessage request = new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);
+
+         SessionCreateProducerResponseMessage response =
+            (SessionCreateProducerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
+
+         // maxRate and windowSize can be overridden by the server
+                  
+         // If the producer is not auto-commit sends then messages are never sent blocking - there is no point
+         // since commit, prepare or rollback will flush any messages sent.
          
-      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);
-      	
-      	SessionCreateProducerResponseMessage response =
-      		(SessionCreateProducerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
-      	
-      	//maxRate and windowSize can be overridden by the server
-      	
-      	producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
-      			                            remotingConnection,
-      			                            response.getMaxRate(),
-      			                            connection.getConnectionFactory().isDefaultBlockOnNonPersistentSend(),      			      			                          
-      			                            autoCommitSends && connection.getConnectionFactory().isDefaultBlockOnPersistentSend(),
-      			                            response.getInitialCredits());  
-      	
-      	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));      	
+         producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
+               remotingConnection,
+               response.getMaxRate(),
+               autoCommitSends && blockOnNonPersistentSend,                                                      
+               autoCommitSends && blockOnPersistentSend,
+               response.getInitialCredits());  
+
+         remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));        
       }
 
-      producers.add(producer);
+      addProducer(producer);
 
       return producer;
    }
    
-   public ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException
-   {
-   	return createProducer(address, -1, rate);
-   }
-   
-   public ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException
-   {
-   	return createProducer(address, windowSize, 0);
-   }
-   
    public XAResource getXAResource()
    {
       return this;
@@ -414,7 +426,7 @@
       	lastCommittedID = lastID;
       }
       
-      for (ClientConsumerInternal consumer: consumers.values())
+      for (ClientConsumerInternal consumer: consumers)
       {
          consumer.recover(lastCommittedID + 1);
       }
@@ -430,6 +442,8 @@
    
    public void acknowledge() throws MessagingException
    {                        
+      checkClosed();
+      
       if (lastID + 1 != deliverID)
       {
          broken = true;
@@ -442,13 +456,15 @@
       acked = false;
       
       if (deliveryExpired)
-      {
+      {         
          remotingConnection.sendOneWay(serverTargetID, serverTargetID, new SessionCancelMessage(lastID, true));
          
          toAckCount = 0;
+         
+         acked = true;
       }
       else if (broken || toAckCount == lazyAckBatchSize)
-      {
+      {         
          acknowledgeInternal(blockOnAcknowledge);
          
          toAckCount = 0;
@@ -551,6 +567,21 @@
       this.deliveryExpired = expired;
    }
    
+   public void addConsumer(final ClientConsumerInternal consumer)
+   {
+      consumers.add(consumer);
+   }
+   
+   public void addProducer(final ClientProducerInternal producer)
+   {
+      producers.add(producer);
+   }
+   
+   public void addBrowser(final ClientBrowser browser)
+   {
+      browsers.add(browser);
+   }
+   
    public void removeConsumer(final ClientConsumerInternal consumer) throws MessagingException
    {
       consumers.remove(consumer.getClientTargetID());
@@ -561,7 +592,9 @@
 
       //2. cancel all deliveries on server but not in tx
             
-      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new SessionCancelMessage(-1, false));
+      log.info("Removing consumer");
+      
+      remotingConnection.sendOneWay(serverTargetID, serverTargetID, new SessionCancelMessage(-1, false));
    }
    
    public void removeProducer(final ClientProducerInternal producer)
@@ -836,7 +869,7 @@
    // Package Private ------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-
+   
    private void acknowledgeInternal(final boolean block) throws MessagingException
    {
       if (acked)
@@ -868,7 +901,7 @@
         
    private void closeChildren() throws MessagingException
    {
-      Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers.values());
+      Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers);
       
       for (ClientConsumer consumer: consumersClone)
       {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -25,6 +25,12 @@
       
    void delivered(long deliveryID, boolean expired);
    
+   void addConsumer(ClientConsumerInternal consumer);
+   
+   void addProducer(ClientProducerInternal producer);
+   
+   void addBrowser(ClientBrowser browser);
+   
    void removeConsumer(ClientConsumerInternal consumer) throws MessagingException;
    
    void removeProducer(ClientProducerInternal producer);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -71,6 +71,19 @@
    {
       return getParentString() + ", deliveryID=" + deliveryID + ", allUpTo=" + allUpTo + "]";
    }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionAcknowledgeMessage == false)
+      {
+         return false;
+      }
+            
+      SessionAcknowledgeMessage r = (SessionAcknowledgeMessage)other;
+      
+      return this.deliveryID == r.deliveryID &&
+             this.allUpTo == r.allUpTo;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCancelMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCancelMessage.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCancelMessage.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -72,6 +72,19 @@
       return getParentString() + ", deliveryID=" + deliveryID + ", expired=" + expired + "]";
    }
    
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionCancelMessage == false)
+      {
+         return false;
+      }
+            
+      SessionCancelMessage r = (SessionCancelMessage)other;
+      
+      return this.deliveryID == r.deliveryID &&
+             this.expired == r.expired;
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -132,6 +132,24 @@
       maxRate = buffer.getInt();
    }
 
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionCreateConsumerMessage == false)
+      {
+         return false;
+      }
+            
+      SessionCreateConsumerMessage r = (SessionCreateConsumerMessage)other;
+      
+      return this.clientTargetID == r.clientTargetID &&
+             this.queueName.equals(r.queueName) &&
+             this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
+             this.noLocal == r.noLocal &&
+             this.autoDeleteQueue == r.autoDeleteQueue &&
+             this.windowSize == r.windowSize &&
+             this.maxRate == r.maxRate;                  
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -99,6 +99,21 @@
       windowSize = buffer.getInt();      
       maxRate = buffer.getInt();
    }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionCreateProducerMessage == false)
+      {
+         return false;
+      }
+            
+      SessionCreateProducerMessage r = (SessionCreateProducerMessage)other;
+      
+      return this.clientTargetID == r.clientTargetID &&
+             this.address == null ? r.address == null : this.address.equals(r.address) &&
+             this.windowSize == r.windowSize &&
+             this.maxRate == r.maxRate;                  
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -144,10 +144,8 @@
       
       rc.stop();
       
-      EasyMock.replay(rcf);
+      EasyMock.replay(rcf, rc);
       
-      EasyMock.replay(rc);
-      
       try
       {
          cf.createConnection();
@@ -159,9 +157,7 @@
          assertEquals(me.getMessage(), e.getMessage());
       }
       
-      EasyMock.verify(rcf);
-      
-      EasyMock.verify(rc);
+      EasyMock.verify(rcf, rc);
    }
    
    public void testThrowableOnStart() throws Throwable
@@ -193,10 +189,8 @@
       
       rc.stop();
       
-      EasyMock.replay(rcf);
+      EasyMock.replay(rcf, rc);
       
-      EasyMock.replay(rc);
-      
       try
       {
          cf.createConnection();
@@ -210,9 +204,7 @@
          assertEquals(MessagingException.INTERNAL_ERROR, e.getCode());
       }
       
-      EasyMock.verify(rcf);
-      
-      EasyMock.verify(rc);
+      EasyMock.verify(rcf, rc);
    }
    
    // Private -----------------------------------------------------------------------------------------------------------
@@ -255,10 +247,8 @@
       
       EasyMock.expect(rc.sendBlocking(0, 0, request)).andReturn(response);
       
-      EasyMock.replay(rcf);
+      EasyMock.replay(rcf, rc);
       
-      EasyMock.replay(rc);
-          
       ClientConnection conn;
       
       if (username == null)         
@@ -270,10 +260,8 @@
          conn = cf.createConnection(username, password);
       }
          
-      EasyMock.verify(rcf);
+      EasyMock.verify(rcf, rc);
       
-      EasyMock.verify(rc);
-      
       assertTrue(conn instanceof ClientConnectionImpl);
       
       assertEquals(serverVersion.getFullVersion(), conn.getServerVersion().getFullVersion());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientConnectionImpl;
 import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
 import org.jboss.messaging.core.client.impl.LocationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -155,71 +156,40 @@
     
       //Create some sessions
       
-      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
-
-      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      request = new ConnectionCreateSessionMessage(false, false, false);
-
-      response = new ConnectionCreateSessionResponseMessage(2);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      request = new ConnectionCreateSessionMessage(false, false, false);
-
-      response = new ConnectionCreateSessionResponseMessage(3);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      EasyMock.replay(rc);
+      conn.addSession(sess1);
+      conn.addSession(sess2);
+      conn.addSession(sess3);
       
-      ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
+      sess1.close();
+      sess2.close();
+      sess3.close();
       
-      ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
-      
-      ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
-      
-      assertFalse(sess1.isClosed());
-      assertFalse(sess2.isClosed());
-      assertFalse(sess3.isClosed());
-      
-      EasyMock.verify(rc);
-      
-      EasyMock.reset(rc);
-            
-      //And the closes of the sessions - this can be in a different order
-      EasyMock.checkOrder(rc, false);
-      EasyMock.expect(rc.sendBlocking(1, 1, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
-      EasyMock.expect(rc.sendBlocking(2, 2, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
-      EasyMock.expect(rc.sendBlocking(3, 3, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
-      EasyMock.checkOrder(rc, true);
-      
       EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
             
       rc.stop();      
       
-      EasyMock.replay(rc);
+      EasyMock.replay(rc, sess1, sess2, sess3);
       
       conn.close();
       
-      EasyMock.verify(rc);
+      EasyMock.verify(rc, sess1, sess2, sess3);
       
-      assertTrue(sess1.isClosed());
-      assertTrue(sess2.isClosed());
-      assertTrue(sess3.isClosed());
-      
       assertTrue(conn.isClosed());
       
       //Close again should do nothing
-      EasyMock.reset(rc);
+      EasyMock.reset(rc, sess1, sess2, sess3);
       
-      EasyMock.replay(rc);
+      EasyMock.replay(rc, sess1, sess2, sess3);
       
       conn.close();
       
-      EasyMock.verify(rc);
+      EasyMock.verify(rc, sess1, sess2, sess3);
       
       try
       {
@@ -291,39 +261,17 @@
       
       ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
       
-      assertFalse(conn.isClosed());
-    
       //Create some sessions
       
-      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
-
-      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      request = new ConnectionCreateSessionMessage(false, false, false);
-
-      response = new ConnectionCreateSessionResponseMessage(2);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      request = new ConnectionCreateSessionMessage(false, false, false);
-
-      response = new ConnectionCreateSessionResponseMessage(3);
-
-      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
       
-      EasyMock.replay(rc);
-      
-      ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
-      
-      ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
-      
-      ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
-      
-      assertFalse(sess1.isClosed());
-      assertFalse(sess2.isClosed());
-      assertFalse(sess3.isClosed());
+      conn.addSession(sess1);
+      conn.addSession(sess2);
+      conn.addSession(sess3);
             
       Set<ClientSession> sessions = conn.getSessions();
       assertEquals(3, sessions.size());
@@ -331,10 +279,6 @@
       assertTrue(sessions.contains(sess2));
       assertTrue(sessions.contains(sess3));
       
-      EasyMock.verify(rc);
-      
-      EasyMock.reset(rc);
-      
       conn.removeSession(sess2);
       
       sessions = conn.getSessions();
@@ -347,6 +291,11 @@
       sessions = conn.getSessions();
       assertEquals(1, sessions.size());   
       assertTrue(sessions.contains(sess3));
+      
+      conn.removeSession(sess3);
+      
+      sessions = conn.getSessions();
+      assertEquals(0, sessions.size());   
    }
                
    // Private -----------------------------------------------------------------------------------------------------------
@@ -399,7 +348,6 @@
                cacheProducers);
       }
 
-
       assertEquals(ackBatchSize, session.getLazyAckBatchSize());
       assertEquals(xa, session.isXA());
       assertEquals(autoCommitSends, session.isAutoCommitSends());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-12 09:39:28 UTC (rev 4440)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-12 09:42:54 UTC (rev 4441)
@@ -21,23 +21,37 @@
  */
 package org.jboss.messaging.tests.unit.core.client.impl;
 
+import javax.transaction.xa.XAResource;
+
 import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
 import org.jboss.messaging.core.client.impl.ClientConsumerPacketHandler;
+import org.jboss.messaging.core.client.impl.ClientProducerInternal;
+import org.jboss.messaging.core.client.impl.ClientProducerPacketHandler;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -115,15 +129,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(null);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
-                  
+      EasyMock.replay(conn, rc);
+
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);      
+      EasyMock.verify(conn, rc);     
    }
    
    public void testDeleteQueue() throws Exception
@@ -140,15 +152,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(null);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
+      EasyMock.replay(conn, rc);
                   
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       session.deleteQueue(request.getQueueName());
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);      
+      EasyMock.verify(conn, rc);     
    }
    
    public void testQueueQuery() throws Exception
@@ -167,15 +177,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(resp);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
+      EasyMock.replay(conn, rc);
                   
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       SessionQueueQueryResponseMessage resp2 = session.queueQuery(request.getQueueName());
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);  
+      EasyMock.verify(conn, rc);
       
       assertTrue(resp == resp2);
    }
@@ -196,15 +204,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(resp);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
+      EasyMock.replay(conn, rc);
                   
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       SessionBindingQueryResponseMessage resp2 = session.bindingQuery(request.getAddress());
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);  
+      EasyMock.verify(conn, rc); 
       
       assertTrue(resp == resp2);
    }
@@ -223,15 +229,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(null);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
-                  
+      EasyMock.replay(conn, rc);
+      
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       session.addDestination(request.getAddress(), request.isTemporary());
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);  
+      EasyMock.verify(conn, rc); 
    }
    
    public void testRemoveDestination() throws Exception
@@ -248,15 +252,13 @@
       
       EasyMock.expect(rc.sendBlocking(targetID, targetID, request)).andReturn(null);
       
-      EasyMock.replay(conn);
-      EasyMock.replay(rc);
+      EasyMock.replay(conn, rc);
                   
       ClientSession session = new ClientSessionImpl(conn, targetID, false, -1, false, false, false, false);
                   
       session.removeDestination(request.getAddress(), true);
       
-      EasyMock.verify(conn);
-      EasyMock.verify(rc);  
+      EasyMock.verify(conn, rc); 
    }
    
    public void testCreateConsumer() throws Exception
@@ -302,8 +304,863 @@
       testCreateConsumerBasicMethod(new SimpleString("usahduiahs"), 121455, 76556, -1);
    }
    
+   public void testCreateProducer() throws Exception
+   {
+      //test with the wide method
+      
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, false, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, false, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, true, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, true, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, false, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, false, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, true, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, true, true);
+      
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, false, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, false, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, true, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, true, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, false, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, false, true);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, true, false);
+      testCreateProducerWideMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, true, true);
+      
+      //Test with the basic method
+      
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, false, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, false, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, true, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, false, true, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, false, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, false, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, true, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 545454, 5454, true, true, true);
+      
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, false, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, false, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, true, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, false, true, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, false, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, false, true);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, true, false);
+      testCreateProducerBasicMethod(new SimpleString("yugygugy"), 545454, 5454, 675765, 3232, true, true, true);
+
+      //Test with the rate limited method
+      
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, false, false, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, false, false, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, false, true, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, false, true, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, true, false, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, true, false, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, true, true, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, -1, 5454, true, true, true);
+      
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, false, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, false, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, true, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, true, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, false, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, false, true);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, true, false);
+      testCreateProducerRateLimitedMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, true, true);
+      
+      //Test with the create producer with window size method
+      
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, false, false, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, false, false, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, false, true, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, false, true, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, true, false, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, true, false, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, true, true, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 545454, -1, true, true, true);
+      
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, false, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, false, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, true, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, false, true, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, false, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, false, true);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, true, false);
+      testCreateProducerWithWindowSizeMethod(new SimpleString("yugygugy"), 5454, 675765, 3232, true, true, true);      
+   }
+   
+   public void testProducerCaching() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      //In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+        
+      final long sessionTargetID = 9121892;
+      
+      final SimpleString address1 = new SimpleString("gyugg");
+      final SimpleString address2 = new SimpleString("g237429834");
+      final int windowSize = 72887827;
+      final int maxRate = -1;
+      
+      //In create producer method
+            
+      {
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         final long clientTargetID = 7676876;
+         
+         EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+                  
+         SessionCreateProducerMessage request =
+            new SessionCreateProducerMessage(clientTargetID, address1, windowSize, maxRate);             
+         
+         SessionCreateProducerResponseMessage resp = 
+            new SessionCreateProducerResponseMessage(67765765, windowSize, maxRate);
+         
+         EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+         
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+      }
+      
+      {
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         final long clientTargetID = 54654654;
+         
+         EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+         
+         SessionCreateProducerMessage request =
+            new SessionCreateProducerMessage(clientTargetID, address2, windowSize, maxRate);             
+         
+         SessionCreateProducerResponseMessage resp = 
+            new SessionCreateProducerResponseMessage(7676876, windowSize, maxRate);
+         
+         EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+         
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+      }
+      
+      EasyMock.replay(conn, rc, pd);
+      
+      //Create three with address1 - only one should be actually created
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, true, false, false, false);
+
+      ClientProducerInternal producer1 = (ClientProducerInternal)session.createProducer(address1, windowSize, maxRate,
+                                                                                        false, false);
+      session.removeProducer(producer1);  
+      
+      ClientProducerInternal producer2 = (ClientProducerInternal)session.createProducer(address1, windowSize, maxRate,
+                                                                                       false, false);      
+      session.removeProducer(producer2);
+      
+      ClientProducerInternal producer3 = (ClientProducerInternal)session.createProducer(address1, windowSize, maxRate,
+                                                                                        false, false);
+      session.removeProducer(producer3);
+      
+      //Create another with a different address
+      
+      ClientProducerInternal producer4 = (ClientProducerInternal)session.createProducer(address2, windowSize, maxRate,
+                                                                                        false, false);
+      session.removeProducer(producer4); 
+            
+      EasyMock.verify(conn, rc, pd);     
+      
+      assertTrue(producer1 == producer2);
+      assertTrue(producer2 == producer3);
+      assertFalse(producer1 == producer4);
+      assertFalse(producer2 == producer4);
+      assertFalse(producer3 == producer4);
+   }
+   
+   public void testProducerNoCaching() throws Exception
+   { 
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      //In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+          
+      final long sessionTargetID = 7617622;      
+      final SimpleString address = new SimpleString("gyugg");
+      final int windowSize = 72887827;
+      final int maxRate = -1;
+
+      for (int i = 0; i < 3; i++)
+      {
+         //In create producer method
+         
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         final long clientTargetID = i + 65655;
+         
+         EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+         
+         
+         SessionCreateProducerMessage request =
+            new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);             
+         
+         SessionCreateProducerResponseMessage resp = 
+            new SessionCreateProducerResponseMessage(i + 273263, windowSize, maxRate);
+         
+         EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+         
+         EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+         
+         pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+      
+      }
+
+      EasyMock.replay(conn, rc, pd);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+
+      ClientProducerInternal producer1 = (ClientProducerInternal)session.createProducer(address, windowSize, maxRate,
+                                                                                        false, false);
+      session.removeProducer(producer1);  
+      
+      ClientProducerInternal producer2 = (ClientProducerInternal)session.createProducer(address, windowSize, maxRate,
+                                                                                       false, false);      
+      session.removeProducer(producer2);
+      
+      ClientProducerInternal producer3 = (ClientProducerInternal)session.createProducer(address, windowSize, maxRate,
+                                                                                        false, false);
+      session.removeProducer(producer3);
+      
+      EasyMock.verify(conn, rc, pd);
+      
+      assertFalse(producer1 == producer2);
+      assertFalse(producer2 == producer3);
+      assertFalse(producer1 == producer3);
+   }
+   
+   public void testGetXAResource() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      
+      ClientSession session = new ClientSessionImpl(conn, 5465, false, -1, false, false, false, false);
+      
+      XAResource res = session.getXAResource();
+      
+      assertTrue(res == session);
+   }
+   
+   public void testTransactedSessionAcknowledgeNotBroken() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
+      final int numMessages = 100;
+      
+      final int sessionTargetID = 71267162;
+            
+      SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(numMessages - 1, true);
+            
+      rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT))).andReturn(null);
+      
+      SessionAcknowledgeMessage message2 = new SessionAcknowledgeMessage(numMessages * 2 - 1, true);
+      
+      rc.sendOneWay(sessionTargetID, sessionTargetID, message2);
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_ROLLBACK))).andReturn(null);
+                  
+      EasyMock.replay(conn, rc, pd);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+      
+      //Simulate some messages being delivered in a non broken sequence (i.e. what would happen with a single consumer
+      //on the session)
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         session.delivered(i, false);
+         
+         session.acknowledge();
+      }
+      
+      //Then commit
+      session.commit();
+      
+      for (int i = numMessages; i < numMessages * 2; i++)
+      {
+         session.delivered(i, false);
+         
+         session.acknowledge();
+      }
+      
+      session.rollback();
+      
+      EasyMock.verify(conn, rc, pd);
+   }
+   
+   public void testAutoCommitSessionAcknowledge() throws Exception
+   {
+      testAutoCommitSessionAcknowledge(true);
+      testAutoCommitSessionAcknowledge(false);
+   }
+            
+   public void testTransactedSessionAcknowledgeBroken() throws Exception
+   {
+      testTransactedSessionAcknowledgeBroken(true);
+      testTransactedSessionAcknowledgeBroken(false);
+   }
+         
+   public void testTransactedSessionAcknowledgeNotBrokenExpired() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
+      final int[] messages = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+      
+      final int sessionTargetID = 71267162;
+         
+      for (int i = 0; i < messages.length; i++)
+      {
+         SessionCancelMessage message = new SessionCancelMessage(messages[i], true);
+         
+         rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+      }
+      
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT))).andReturn(null);
+                  
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+      
+      //Simulate some messages being delivered in a non broken sequence (i.e. what would happen with a single consumer
+      //on the session)
+            
+      for (int i = 0; i < messages.length; i++)
+      {
+         session.delivered(messages[i], true);
+         
+         session.acknowledge();
+      }
+      
+      //Then commit
+      session.commit();
+      
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd); 
+   }
+   
+   public void testTransactedSessionAcknowledgeBrokenExpired() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
+      final int[] messages = new int[] { 1, 3, 5, 7, 9, 2, 4, 10, 20, 21, 22, 23, 19, 18, 15, 30, 31, 32, 40, 35 };
+      
+      final int sessionTargetID = 71267162;
+         
+      for (int i = 0; i < messages.length; i++)
+      {
+         SessionCancelMessage message = new SessionCancelMessage(messages[i], true);
+         
+         rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+      }
+      
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT))).andReturn(null);
+                  
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+      
+      //Simulate some messages being delivered in a broken sequence (i.e. what would happen with a single consumer
+      //on the session)
+            
+      for (int i = 0; i < messages.length; i++)
+      {
+         session.delivered(messages[i], true);
+         
+         session.acknowledge();
+      }
+      
+      //Then commit
+      session.commit();
+      
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd); 
+   }
+   
+   public void testClose() throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+          
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+          
+      //In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+        
+      final long sessionTargetID = 9121892;
+      
+      ClientProducerInternal prod1 = EasyMock.createStrictMock(ClientProducerInternal.class);
+      ClientProducerInternal prod2 = EasyMock.createStrictMock(ClientProducerInternal.class);
+      
+      ClientConsumerInternal cons1 = EasyMock.createStrictMock(ClientConsumerInternal.class);
+      ClientConsumerInternal cons2 = EasyMock.createStrictMock(ClientConsumerInternal.class);
+      
+      ClientBrowser browser1 = EasyMock.createStrictMock(ClientBrowser.class);
+      ClientBrowser browser2 = EasyMock.createStrictMock(ClientBrowser.class);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+                  
+      prod1.close();
+      prod2.close();
+      cons1.close();
+      cons2.close();
+      browser1.close();
+      browser2.close();
+      
+      SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(0, true);
+      
+      rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+      
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+      
+      conn.removeSession(session);      
+      
+      EasyMock.replay(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      session.addProducer(prod1);
+      session.addProducer(prod2);
+      
+      session.addConsumer(cons1);
+      session.addConsumer(cons2);
+      
+      session.addBrowser(browser1);
+      session.addBrowser(browser2);
+      
+      assertFalse(session.isClosed());
+            
+      session.close();
+      
+      EasyMock.verify(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      assertTrue(session.isClosed());
+   }
+   
    // Private -------------------------------------------------------------------------------------------
 
+   private void testAutoCommitSessionAcknowledge(boolean blockOnAcknowledge) throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
+      final int numMessages = 100;
+      
+      final int batchSize = 10;
+            
+      final int sessionTargetID = 71267162;
+            
+      for (int i = 0; i < numMessages / batchSize; i++)
+      {
+         SessionAcknowledgeMessage message = new SessionAcknowledgeMessage((i + 1) * batchSize - 1, true);
+               
+         if (blockOnAcknowledge)
+         {
+            EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, message)).andReturn(null);
+         }
+         else
+         {
+            rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+         }
+      }
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT))).andReturn(null);
+      
+                    
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+            
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, batchSize, false, false, true, blockOnAcknowledge);
+      
+      //Simulate some messages being delivered in a non broken sequence (i.e. what would happen with a single consumer
+      //on the session)
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         session.delivered(i, false);
+         
+         session.acknowledge();
+      }
+      
+      //Then commit
+      session.commit();
+      
+      
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd); 
+   }
+   
+   private void testTransactedSessionAcknowledgeBroken(boolean blockOnAcknowledge) throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
+      final int[] messages = new int[] { 1, 3, 5, 7, 9, 2, 4, 10, 20, 21, 22, 23, 19, 18, 15, 30, 31, 32, 40, 35 };
+      
+      final int sessionTargetID = 71267162;
+         
+      for (int i = 0; i < messages.length; i++)
+      {
+         SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(messages[i], false);
+         
+         if (blockOnAcknowledge)
+         {
+            EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, message)).andReturn(null);
+         }
+         else
+         {
+            rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+         }
+      }
+      
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT))).andReturn(null);
+                  
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, blockOnAcknowledge);
+      
+      //Simulate some messages being delivered in a broken sequence (i.e. what would happen with a single consumer
+      //on the session)
+            
+      for (int i = 0; i < messages.length; i++)
+      {
+         session.delivered(messages[i], false);
+         
+         session.acknowledge();
+      }
+      
+      //Then commit
+      session.commit();
+      
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd); 
+   }
+   
+   private void testCreateProducerWithWindowSizeMethod(final SimpleString address,
+         final int windowSize, final int initialCredits,
+         final int serverMaxRate,
+         final boolean blockOnNPSend,
+         final boolean blockOnPSend,
+         final boolean autoCommitSends) throws Exception
+   {
+      ClientConnectionFactory cf = EasyMock.createStrictMock(ClientConnectionFactory.class);
+
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+      // In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+
+      // Defaults from cf
+
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+
+      EasyMock.expect(cf.isDefaultBlockOnNonPersistentSend()).andReturn(blockOnNPSend);
+
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+
+      EasyMock.expect(cf.isDefaultBlockOnPersistentSend()).andReturn(blockOnPSend);   
+
+      final long clientTargetID = 7676876;
+
+      EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+
+      final long sessionTargetID = 9121892;
+
+      SessionCreateProducerMessage request =
+         new SessionCreateProducerMessage(clientTargetID, address, windowSize, -1);             
+
+      SessionCreateProducerResponseMessage resp = 
+         new SessionCreateProducerResponseMessage(67765765, initialCredits, serverMaxRate);
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+
+      pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+
+      EasyMock.replay(cf);
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+
+      ClientSession session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, autoCommitSends, false, false);
+
+      ClientProducerInternal producer = (ClientProducerInternal)session.createProducerWithWindowSize(address, windowSize);
+
+      EasyMock.verify(cf);
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd);
+
+      assertEquals(address, producer.getAddress());
+      assertEquals(autoCommitSends && blockOnNPSend, producer.isBlockOnNonPersistentSend());
+      assertEquals(autoCommitSends && blockOnPSend, producer.isBlockOnPersistentSend());
+      assertEquals(initialCredits, producer.getInitialWindowSize());
+      assertEquals(serverMaxRate, producer.getMaxRate());
+   }
+   
+   private void testCreateProducerRateLimitedMethod(final SimpleString address,
+                                                    final int maxRate, final int initialCredits,
+                                                    final int serverMaxRate,
+                                                    final boolean blockOnNPSend,
+                                                    final boolean blockOnPSend,
+                                                    final boolean autoCommitSends) throws Exception
+   {
+      ClientConnectionFactory cf = EasyMock.createStrictMock(ClientConnectionFactory.class);
+
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+      // In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+      
+      // Defaults from cf
+        
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.isDefaultBlockOnNonPersistentSend()).andReturn(blockOnNPSend);
+      
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.isDefaultBlockOnPersistentSend()).andReturn(blockOnPSend);   
+
+      final long clientTargetID = 7676876;
+
+      EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+
+      final long sessionTargetID = 9121892;
+
+      SessionCreateProducerMessage request =
+         new SessionCreateProducerMessage(clientTargetID, address, -1, maxRate);             
+
+      SessionCreateProducerResponseMessage resp = 
+         new SessionCreateProducerResponseMessage(67765765, initialCredits, serverMaxRate);
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+
+      pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+
+      EasyMock.replay(cf);
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+
+      ClientSession session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, autoCommitSends, false, false);
+
+      ClientProducerInternal producer = (ClientProducerInternal)session.createRateLimitedProducer(address, maxRate);
+
+      EasyMock.verify(cf);
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd);
+
+      assertEquals(address, producer.getAddress());
+      assertEquals(autoCommitSends && blockOnNPSend, producer.isBlockOnNonPersistentSend());
+      assertEquals(autoCommitSends && blockOnPSend, producer.isBlockOnPersistentSend());
+      assertEquals(initialCredits, producer.getInitialWindowSize());
+      assertEquals(serverMaxRate, producer.getMaxRate());
+   }
+   
+   private void testCreateProducerBasicMethod(final SimpleString address, final int windowSize,
+         final int maxRate, final int initialCredits,
+         final int serverMaxRate,
+         final boolean blockOnNPSend,
+         final boolean blockOnPSend,
+         final boolean autoCommitSends) throws Exception
+   {
+      ClientConnectionFactory cf = EasyMock.createStrictMock(ClientConnectionFactory.class);
+
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+      // In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+      
+      // Defaults from cf
+      
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.getDefaultProducerWindowSize()).andReturn(windowSize);
+      
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.getDefaultProducerMaxRate()).andReturn(maxRate);   
+      
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.isDefaultBlockOnNonPersistentSend()).andReturn(blockOnNPSend);
+      
+      EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
+      
+      EasyMock.expect(cf.isDefaultBlockOnPersistentSend()).andReturn(blockOnPSend);   
+
+      final long clientTargetID = 7676876;
+
+      EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+
+      final long sessionTargetID = 9121892;
+
+      SessionCreateProducerMessage request =
+         new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);             
+
+      SessionCreateProducerResponseMessage resp = 
+         new SessionCreateProducerResponseMessage(67765765, initialCredits, serverMaxRate);
+
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+
+      pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+
+      EasyMock.replay(cf);
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+
+      ClientSession session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, autoCommitSends, false, false);
+
+      ClientProducerInternal producer = (ClientProducerInternal)session.createProducer(address);
+
+      EasyMock.verify(cf);
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd);
+
+      assertEquals(address, producer.getAddress());
+      assertEquals(autoCommitSends && blockOnNPSend, producer.isBlockOnNonPersistentSend());
+      assertEquals(autoCommitSends && blockOnPSend, producer.isBlockOnPersistentSend());
+      assertEquals(initialCredits, producer.getInitialWindowSize());
+      assertEquals(serverMaxRate, producer.getMaxRate());
+   }
+   
+   private void testCreateProducerWideMethod(final SimpleString address, final int windowSize,
+                                             final int maxRate, final int initialCredits,
+                                             final int serverMaxRate,
+                                             final boolean blockOnNPSend,
+                                             final boolean blockOnPSend,
+                                             final boolean autoCommitSends) throws Exception
+   {
+      ClientConnectionFactory cf = EasyMock.createStrictMock(ClientConnectionFactory.class);
+      
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+           
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+           
+      //In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+                        
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+      
+      final long clientTargetID = 7676876;
+      
+      EasyMock.expect(pd.generateID()).andReturn(clientTargetID);
+      
+      final long sessionTargetID = 9121892;
+      
+      SessionCreateProducerMessage request =
+         new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);             
+      
+      SessionCreateProducerResponseMessage resp = 
+         new SessionCreateProducerResponseMessage(67765765, initialCredits, serverMaxRate);
+      
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+      
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+      
+      pd.register(new ClientProducerPacketHandler(null, clientTargetID));
+      
+      EasyMock.replay(cf);
+      EasyMock.replay(conn);
+      EasyMock.replay(rc);
+      EasyMock.replay(pd);
+      
+      ClientSession session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, autoCommitSends, false, false);
+
+      ClientProducerInternal producer = (ClientProducerInternal)session.createProducer(address, windowSize, maxRate, blockOnNPSend, blockOnPSend);
+      
+      EasyMock.verify(cf);
+      EasyMock.verify(conn);
+      EasyMock.verify(rc);
+      EasyMock.verify(pd);
+      
+      assertEquals(address, producer.getAddress());
+      assertEquals(autoCommitSends && blockOnNPSend, producer.isBlockOnNonPersistentSend());
+      assertEquals(autoCommitSends && blockOnPSend, producer.isBlockOnPersistentSend());
+      assertEquals(initialCredits, producer.getInitialWindowSize());
+      assertEquals(serverMaxRate, producer.getMaxRate());
+   }
+   
    private void testCreateConsumerDefaultsMethod(final SimpleString queueName, final SimpleString filterString, final boolean noLocal,
          final boolean autoDeleteQueue, final boolean direct,
          final int windowSize, final int maxRate, final int serverWindowSize) throws Exception




More information about the jboss-cvs-commits mailing list