[jboss-cvs] JBoss Messaging SVN: r4453 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 12 16:15:59 EDT 2008


Author: timfox
Date: 2008-06-12 16:15:59 -0400 (Thu, 12 Jun 2008)
New Revision: 4453

Added:
   trunk/src/main/org/jboss/messaging/util/TokenBucketLimiterImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientProducerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterImplTest.java
Removed:
   trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterTest.java
Modified:
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More tests


Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -40,6 +40,7 @@
 import org.jboss.jms.util.PerfParams;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.util.TokenBucketLimiter;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
 /**
  * A simple example that can be used to gather basic performance measurements.
@@ -172,7 +173,7 @@
       
       final int modulo = 2000;
       
-      TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiter(throttleRate, false): null;
+      TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false): null;
 
       boolean committed = false;
       for (int i = 1; i <= numberOfMessages; i++)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -79,18 +79,16 @@
    private final boolean creditFlowControl;
    
    private final int initialWindowSize;
-   
-   private final int maxRate;
-   
+    
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
       
-   public ClientProducerImpl(final ClientSessionInternal session, final long serverTargetID,
+   public ClientProducerImpl(final ClientSessionInternal session,
+                             final long serverTargetID,
                              final long clientTargetID,
-   		                    final SimpleString address,
-   		                    final RemotingConnection remotingConnection,
-   		                    final int maxRate,
+   		                    final SimpleString address,   		                   
+   		                    final TokenBucketLimiter rateLimiter,
    		                    final boolean blockOnNonPersistentSend,
    		                    final boolean blockOnPersistentSend,
    		                    final int initialCredits)
@@ -103,17 +101,10 @@
       
       this.address = address;
       
-      this.remotingConnection = remotingConnection;
+      this.remotingConnection = session.getConnection().getRemotingConnection();
       
-      if (maxRate != -1)
-      {
-      	this.rateLimiter = new TokenBucketLimiter(maxRate, false);
-      }
-      else
-      {
-      	this.rateLimiter = null;
-      }
-      
+      this.rateLimiter = rateLimiter;
+            
       this.blockOnNonPersistentSend = blockOnNonPersistentSend; 
       
       this.blockOnPersistentSend = blockOnPersistentSend;
@@ -123,8 +114,6 @@
       this.creditFlowControl = initialCredits != -1;
       
       this.initialWindowSize = initialCredits;
-      
-      this.maxRate = maxRate;
    }
    
    // ClientProducer implementation ----------------------------------------------------------------
@@ -147,51 +136,7 @@
       
       doSend(address, msg);
    }
-   
-   private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
-   {
-      if (address != null)
-      {
-         msg.setDestination(address);
-      }
-      else
-      {
-         msg.setDestination(this.address);
-      }
-         	   	
-   	if (rateLimiter != null)
-      {
-         // Rate flow control
-                  
-         rateLimiter.limit();
-      }
-   	
-   	boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-   	
-      ProducerSendMessage message = new ProducerSendMessage(msg);
-         		
-   	if (sendBlocking)
-   	{   	   
-   	   remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
-   	}
-   	else
-   	{
-   	   remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
-   	}   	 
-   	
-      //We only flow control with non-anonymous producers
-      if (address == null && creditFlowControl)
-      {
-         try
-         {
-            availableCredits.acquire(message.getClientMessage().encodeSize());
-         }
-         catch (InterruptedException e)
-         {           
-         }         
-      }
-   }
-            
+          
    public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
    {
       // TODO      
@@ -238,7 +183,7 @@
    
    public int getMaxRate()
    {
-      return maxRate;
+      return rateLimiter == null ? -1 : rateLimiter.getRate();
    }
    
    // ClientProducerInternal implementation --------------------------------------------------------
@@ -248,6 +193,11 @@
       availableCredits.release(credits);
    }
    
+   public int getAvailableCredits()
+   {
+      return availableCredits.availablePermits();
+   }
+   
    // Public ---------------------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -255,6 +205,50 @@
    // Package Private ------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
+   
+   private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
+   {
+      if (address != null)
+      {
+         msg.setDestination(address);
+      }
+      else
+      {
+         msg.setDestination(this.address);
+      }
+                  
+      if (rateLimiter != null)
+      {
+         // Rate flow control
+                  
+         rateLimiter.limit();
+      }
+      
+      boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
+      
+      ProducerSendMessage message = new ProducerSendMessage(msg);
+               
+      if (sendBlocking)
+      {        
+         remotingConnection.sendBlocking(serverTargetID, serverTargetID, message);
+      }
+      else
+      {
+         remotingConnection.sendOneWay(serverTargetID, serverTargetID, message);
+      }      
+      
+      //We only flow control with non-anonymous producers
+      if (address == null && creditFlowControl)
+      {
+         try
+         {
+            availableCredits.acquire(message.getClientMessage().encodeSize());
+         }
+         catch (InterruptedException e)
+         {           
+         }         
+      }
+   }
 
    private void checkClosed() throws MessagingException
    {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -18,4 +18,6 @@
 public interface ClientProducerInternal extends ClientProducer
 {
 	void receiveCredits(int credits) throws Exception;
+	
+	int getAvailableCredits();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -72,6 +72,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -392,9 +393,8 @@
          // If the producer is not auto-commit sends then messages are never sent blocking - there is no point
          // since commit, prepare or rollback will flush any messages sent.
          
-         producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
-               remotingConnection,
-               response.getMaxRate(),
+         producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address, 
+               response.getMaxRate() == -1 ? null : new TokenBucketLimiterImpl(response.getMaxRate(), false),
                autoCommitSends && blockOnNonPersistentSend,                                                      
                autoCommitSends && blockOnPersistentSend,
                response.getInitialCredits());  

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.TokenBucketLimiter;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
 /**
  * Concrete implementation of a ClientConsumer. 
@@ -118,7 +119,7 @@
       
       if (maxRate != -1)
       {
-      	limiter = new TokenBucketLimiter(maxRate, false);
+      	limiter = new TokenBucketLimiterImpl(maxRate, false);
       }
       else
       {

Deleted: trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -1,115 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.util;
-
-/**
- * 
- * A TokenBucketLimiter
- * 
- * This class can throttle to a specfic rate, using an algorithm based on the Token Bucket metaphor
- * http://en.wikipedia.org/wiki/Token_bucket
- * 
- * The rate is specified in Hertz
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class TokenBucketLimiter
-{
-	private final int rate;
-	
-	private final boolean spin;
-		
-	private volatile long last;
-	
-	private volatile int tokens;
-	
-	private volatile int tokensAdded;
-		
-	public TokenBucketLimiter(final int rate, final boolean spin)
-	{
-		this.rate = rate;
-		
-		this.spin = spin;
-	}
-		
-	public void limit()
-	{			
-		while (!check())
-		{
-			if (!spin)
-			{
-   			try
-   			{
-   				Thread.sleep(1);
-   			}
-   			catch (Exception e)
-   			{			
-   				//Ignore
-   			}
-			}
-		}
-	}
-	
-	private boolean check()
-	{					
-		long now = System.currentTimeMillis();
-		
-		if (last == 0)
-		{
-			last = now;
-		}
-		
-		long diff = now - last;
-		
-		if (diff >= 1000)
-		{
-			last = last + 1000;
-			
-			tokens = 0;
-			
-			tokensAdded = 0;
-		}
-														
-		int tokensDue = (int)(rate * diff  / 1000);
-		
-		int tokensToAdd = tokensDue - tokensAdded;
-		
-		if (tokensToAdd > 0)
-		{
-			tokens += tokensToAdd;
-			
-			tokensAdded += tokensToAdd;
-		}
-							
-		if (tokens > 0)
-		{
-			tokens--;
-			
-			return true;
-		}
-		else
-		{
-			return false;
-		}
-	}	
-}

Copied: trunk/src/main/org/jboss/messaging/util/TokenBucketLimiterImpl.java (from rev 4451, trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TokenBucketLimiterImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/TokenBucketLimiterImpl.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -0,0 +1,120 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.util;
+
+/**
+ * 
+ * A TokenBucketLimiterImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TokenBucketLimiterImpl implements TokenBucketLimiter
+{
+	private final int rate;
+	
+	private final boolean spin;
+		
+	private volatile long last;
+	
+	private volatile int tokens;
+	
+	private volatile int tokensAdded;
+		
+	public TokenBucketLimiterImpl(final int rate, final boolean spin)
+	{
+		this.rate = rate;
+		
+		this.spin = spin;
+	}
+	
+	public int getRate()
+	{
+	   return rate;
+	}
+	
+	public boolean isSpin()
+	{
+	   return spin;
+	}
+		
+	public void limit()
+	{			
+		while (!check())
+		{
+			if (!spin)
+			{
+   			try
+   			{
+   				Thread.sleep(1);
+   			}
+   			catch (Exception e)
+   			{			
+   				//Ignore
+   			}
+			}
+		}
+	}
+	
+	private boolean check()
+	{					
+		long now = System.currentTimeMillis();
+		
+		if (last == 0)
+		{
+			last = now;
+		}
+		
+		long diff = now - last;
+		
+		if (diff >= 1000)
+		{
+			last = last + 1000;
+			
+			tokens = 0;
+			
+			tokensAdded = 0;
+		}
+														
+		int tokensDue = (int)(rate * diff  / 1000);
+		
+		int tokensToAdd = tokensDue - tokensAdded;
+		
+		if (tokensToAdd > 0)
+		{
+			tokens += tokensToAdd;
+			
+			tokensAdded += tokensToAdd;
+		}
+							
+		if (tokens > 0)
+		{
+			tokens--;
+			
+			return true;
+		}
+		else
+		{
+			return false;
+		}
+	}	
+}

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientProducerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientProducerImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientProducerImplTest.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -0,0 +1,323 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.client.impl;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
+import org.jboss.messaging.core.client.impl.ClientProducerImpl;
+import org.jboss.messaging.core.client.impl.ClientProducerInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TokenBucketLimiter;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
+
+/**
+ * 
+ * A ClientProducerImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClientProducerImplTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(ClientProducerImplTest.class);
+
+   // Public -----------------------------------------------------------------------------------------------------------
+
+   public void testConstructor() throws Exception
+   {
+      testConstructor(16521652, false, false);
+      testConstructor(16521652, false, true);
+      testConstructor(16521652, true, false);
+      testConstructor(16521652, true, true);
+      testConstructor(-1, false, false);
+      testConstructor(-1, false, true);
+      testConstructor(-1, true, false);
+      testConstructor(-1, true, true);
+   }
+   
+   private void testConstructor(final int maxRate, final boolean blockOnNP, final boolean blockOnP) throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      EasyMock.expect(session.getConnection()).andReturn(connection);
+      EasyMock.expect(connection.getRemotingConnection()).andReturn(rc);
+      
+      SimpleString address = new SimpleString("uhasuuhs");
+      
+      final int initialCredits = 7612672;
+
+      EasyMock.replay(session, connection, rc);
+      
+      TokenBucketLimiter limiter = maxRate != -1 ? new TokenBucketLimiterImpl(maxRate, false) : null;
+      
+      ClientProducerInternal producer =
+         new ClientProducerImpl(session, 7876L, 76767L, address, limiter,
+                                blockOnNP, blockOnP, initialCredits);
+      
+      EasyMock.verify(session, connection, rc);
+      
+      assertEquals(address, producer.getAddress());
+      assertEquals(initialCredits, producer.getInitialWindowSize());
+      assertEquals(maxRate, producer.getMaxRate());
+      assertEquals(blockOnNP, producer.isBlockOnNonPersistentSend());
+      assertEquals(blockOnP, producer.isBlockOnPersistentSend());
+      assertFalse(producer.isClosed());
+      
+   }
+   
+   public void testSend() throws Exception
+   {
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, false, false, false);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, false, false, true);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, false, true, false);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, false, true, true);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, true, false, false);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, true, false, true);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, true, true, false);
+      testSend(-1, 652652, new SimpleString("uyuyyu"), null, true, true, true);
+      
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), false, false, false);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), false, false, true);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), false, true, false);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), false, true, true);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), true, false, false);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), true, false, true);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), true, true, false);
+      testSend(-1, 652652, null, new SimpleString("uyuyyu"), true, true, true);
+      
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, false, false, false);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, false, false, true);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, false, true, false);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, false, true, true);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, true, false, false);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, true, false, true);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, true, true, false);
+      testSend(652652, -1, new SimpleString("uyuyyu"), null, true, true, true);
+      
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), false, false, false);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), false, false, true);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), false, true, false);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), false, true, true);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), true, false, false);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), true, false, true);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), true, true, false);
+      testSend(652652, -1, null, new SimpleString("uyuyyu"), true, true, true);
+      
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, false, false, false);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, false, false, true);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, false, true, false);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, false, true, true);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, true, false, false);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, true, false, true);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, true, true, false);
+      testSend(652652, 476476, new SimpleString("uyuyyu"), null, true, true, true);
+      
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), false, false, false);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), false, false, true);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), false, true, false);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), false, true, true);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), true, false, false);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), true, false, true);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), true, true, false);
+      testSend(652652, 476476, null, new SimpleString("uyuyyu"), true, true, true);
+   }
+   
+   public void testReceiveCredits() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      
+      EasyMock.expect(session.getConnection()).andReturn(connection);
+      EasyMock.expect(connection.getRemotingConnection()).andReturn(rc);
+      
+      final int initialCredits = 7612672;
+
+      EasyMock.replay(session, connection, rc);
+      
+      ClientProducerInternal producer =
+         new ClientProducerImpl(session, 7876L, 76767L, new SimpleString("uhasuuhs"), null,
+                                false, false, initialCredits);
+      
+      EasyMock.verify(session, connection, rc);
+      
+      assertEquals(initialCredits, producer.getAvailableCredits());
+      
+      final int credits1 = 1928;
+      final int credits2 = 18272;
+      final int credits3 = 309;
+      producer.receiveCredits(credits1);
+      assertEquals(initialCredits + credits1, producer.getAvailableCredits());
+      producer.receiveCredits(credits2);
+      assertEquals(initialCredits + credits1 + credits2, producer.getAvailableCredits());
+      producer.receiveCredits(credits3);
+      assertEquals(initialCredits + credits1 + credits2 + credits3, producer.getAvailableCredits());
+   }
+   
+   public void testClose() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+      
+      EasyMock.expect(session.getConnection()).andReturn(connection);
+      EasyMock.expect(connection.getRemotingConnection()).andReturn(rc);
+      
+      EasyMock.replay(session, connection, rc);
+      
+      final int initialCredits = 7612672;
+      
+      final long clientTargetID = 121212;      
+      
+      ClientProducerInternal producer =
+         new ClientProducerImpl(session, 7876L, clientTargetID, new SimpleString("uhasuuhs"), null,
+                                false, false, initialCredits);
+      
+      assertFalse(producer.isClosed());
+      
+      EasyMock.verify(session, connection, rc);
+      EasyMock.reset(session, connection, rc);
+      
+      
+      session.removeProducer(producer);
+      EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
+      pd.unregister(clientTargetID);
+      
+      EasyMock.replay(session, connection, rc, pd);
+      
+      producer.close();
+      
+      EasyMock.verify(session, connection, rc, pd);
+      
+      assertTrue(producer.isClosed());   
+      
+      EasyMock.reset(session, connection, rc, pd);
+      
+      EasyMock.replay(session, connection, rc, pd);
+      
+      //close again should do nothing
+      
+      producer.close();
+      
+      EasyMock.verify(session, connection, rc, pd);      
+   }
+   
+   // Private ----------------------------------------------------------------------------------------
+   
+   private void testSend(final int maxRate, final int windowSize,
+                         final SimpleString prodAddress, final SimpleString sendAddress,
+                         final boolean blockOnNonPersistentSend,
+                         final boolean blockOnPersistentSend,
+                         boolean durable) throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ClientMessage message = EasyMock.createStrictMock(ClientMessage.class);
+            
+      EasyMock.expect(session.getConnection()).andReturn(connection);
+      EasyMock.expect(connection.getRemotingConnection()).andReturn(rc);
+      
+      if (sendAddress != null)
+      {
+         message.setDestination(sendAddress);
+      }
+      else
+      {
+         message.setDestination(prodAddress);
+      }
+      
+      EasyMock.expect(message.isDurable()).andReturn(durable);
+            
+      TokenBucketLimiter limiter = maxRate != -1 ? EasyMock.createStrictMock(TokenBucketLimiter.class) : null;
+      
+      if (limiter != null)
+      {
+         limiter.limit();
+      }
+      
+      final int targetID = 91821982;
+      
+      boolean sendBlocking = durable ? blockOnPersistentSend : blockOnNonPersistentSend;
+            
+      if (sendBlocking)
+      {
+         EasyMock.expect(rc.sendBlocking(targetID, targetID, new ProducerSendMessage(message))).andReturn(null);
+      }
+      else
+      {
+         rc.sendOneWay(targetID, targetID, new ProducerSendMessage(message));
+      }
+      
+      final int messageSize = 123;
+      
+      log.info("prod address is " + prodAddress);
+      
+      if (sendAddress == null && windowSize != -1)
+      {
+         EasyMock.expect(message.encodeSize()).andReturn(messageSize);
+      }
+      
+      EasyMock.replay(session, connection, rc, message);
+            
+      ClientProducerInternal producer =
+         new ClientProducerImpl(session, targetID, 76767L, prodAddress, limiter, blockOnNonPersistentSend,
+               blockOnPersistentSend, windowSize);
+      
+      log.info("Send address is " + sendAddress);
+      if (sendAddress != null)
+      {
+         producer.send(sendAddress, message);
+      }
+      else
+      {
+         producer.send(message);
+      }
+      
+      log.info("Sent");
+       
+      EasyMock.verify(session, connection, rc, message);
+      
+      if (sendAddress == null && windowSize != -1)
+      {
+         //Credits should have been depleted
+         
+         assertEquals(windowSize - messageSize, producer.getAvailableCredits());
+      }
+      else
+      {
+         assertEquals(windowSize, producer.getAvailableCredits());
+      }
+   }
+   
+}
+

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -93,7 +93,7 @@
 {
    private static final Logger log = Logger.getLogger(ClientSessionImplTest.class);
 
-   // Private -----------------------------------------------------------------------------------------------------------
+   // Public -----------------------------------------------------------------------------------------------------------
 
    public void testConstructor() throws Exception
    {            
@@ -446,6 +446,8 @@
          
          EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
          
+         EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+                  
          pd.register(new ClientProducerPacketHandler(null, clientTargetID));
       }
       
@@ -466,6 +468,8 @@
          
          EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
          
+         EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+                  
          pd.register(new ClientProducerPacketHandler(null, clientTargetID));
       }
       
@@ -561,8 +565,9 @@
          
          EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
          
-         pd.register(new ClientProducerPacketHandler(null, clientTargetID));
-      
+         EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+                  
+         pd.register(new ClientProducerPacketHandler(null, clientTargetID));      
       }
 
       EasyMock.replay(conn, rc, pd);
@@ -785,264 +790,7 @@
       testClose(false);
    }
    
-   private void testClose(boolean delivered) throws Exception
-   {
-      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
-          
-      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-          
-      //In ClientSessionImpl constructor
-      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
-        
-      final long sessionTargetID = 9121892;
-                  
-      EasyMock.replay(conn, rc);
-      
-      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
-      
-      EasyMock.verify(conn, rc);
-      
-      EasyMock.reset(conn, rc);
-      
-      ClientProducerInternal prod1 = EasyMock.createStrictMock(ClientProducerInternal.class);
-      ClientProducerInternal prod2 = EasyMock.createStrictMock(ClientProducerInternal.class);
-      
-      ClientConsumerInternal cons1 = EasyMock.createStrictMock(ClientConsumerInternal.class);
-      ClientConsumerInternal cons2 = EasyMock.createStrictMock(ClientConsumerInternal.class);
-      
-      ClientBrowser browser1 = EasyMock.createStrictMock(ClientBrowser.class);
-      ClientBrowser browser2 = EasyMock.createStrictMock(ClientBrowser.class);
-                    
-      prod1.close();
-      prod2.close();
-      cons1.close();
-      cons2.close();
-      browser1.close();
-      browser2.close();
-      
-      final int numDeliveries = 10;
-      
-      if (delivered)
-      {
-         SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(numDeliveries - 1, true);
-         
-         rc.sendOneWay(sessionTargetID, sessionTargetID, message);
-      }
-            
-      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
-      
-      conn.removeSession(session);      
-            
-      EasyMock.replay(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
-                 
-      session.addProducer(prod1);
-      session.addProducer(prod2);
-      
-      session.addConsumer(cons1);
-      session.addConsumer(cons2);
-      
-      session.addBrowser(browser1);
-      session.addBrowser(browser2);
-      
-      assertFalse(session.isClosed());
-      
-      if (delivered)
-      {
-         //Simulate there being some undelivered messages
-         for (int i = 0; i < numDeliveries; i++)
-         {
-            session.delivered(i, false);
-            session.acknowledge();
-         }
-      }
-            
-      session.close();
-      
-      EasyMock.verify(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
-      
-      assertTrue(session.isClosed());      
-      
-      try
-      {
-         session.createQueue(new SimpleString("trtr"), new SimpleString("iuasij"), null, false, false);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.deleteQueue(new SimpleString("trtr"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.addDestination(new SimpleString("trtr"), false);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.removeDestination(new SimpleString("trtr"), false);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.queueQuery(new SimpleString("trtr"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.bindingQuery(new SimpleString("trtr"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createConsumer(new SimpleString("trtr"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createConsumer(new SimpleString("iasjq"), null, false, false, false);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createConsumer(new SimpleString("husuhsuh"), null, false, false, false, 8787, 7162761);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createBrowser(new SimpleString("husuhsuh"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createBrowser(new SimpleString("husuhsuh"), null);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createProducer(new SimpleString("husuhsuh"));
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createProducer(new SimpleString("iashi"), 878778, 8778, false, false);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createRateLimitedProducer(new SimpleString("uhsuhs"), 78676);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.createProducerWithWindowSize(new SimpleString("uhsuhs"), 78676);
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.commit();
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.rollback();
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-      try
-      {
-         session.acknowledge();
-         fail("Should throw exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
-      }
-      
-   }
+   
     
    public void testAddRemoveConsumer() throws Exception
    {
@@ -1408,6 +1156,275 @@
    
    // Private -------------------------------------------------------------------------------------------
 
+   private void testClose(boolean delivered) throws Exception
+   {
+      ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+          
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+          
+      //In ClientSessionImpl constructor
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+        
+      final long sessionTargetID = 9121892;
+                  
+      EasyMock.replay(conn, rc);
+      
+      ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+      
+      EasyMock.verify(conn, rc);
+      
+      EasyMock.reset(conn, rc);
+      
+      ClientProducerInternal prod1 = EasyMock.createStrictMock(ClientProducerInternal.class);
+      ClientProducerInternal prod2 = EasyMock.createStrictMock(ClientProducerInternal.class);
+      
+      ClientConsumerInternal cons1 = EasyMock.createStrictMock(ClientConsumerInternal.class);
+      ClientConsumerInternal cons2 = EasyMock.createStrictMock(ClientConsumerInternal.class);
+      
+      ClientBrowser browser1 = EasyMock.createStrictMock(ClientBrowser.class);
+      ClientBrowser browser2 = EasyMock.createStrictMock(ClientBrowser.class);
+                    
+      prod1.close();
+      prod2.close();
+      cons1.close();
+      cons2.close();
+      browser1.close();
+      browser2.close();
+      
+      final int numDeliveries = 10;
+      
+      if (delivered)
+      {
+         SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(numDeliveries - 1, true);
+         
+         rc.sendOneWay(sessionTargetID, sessionTargetID, message);
+      }
+            
+      EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+      
+      conn.removeSession(session);      
+            
+      EasyMock.replay(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+                 
+      session.addProducer(prod1);
+      session.addProducer(prod2);
+      
+      session.addConsumer(cons1);
+      session.addConsumer(cons2);
+      
+      session.addBrowser(browser1);
+      session.addBrowser(browser2);
+      
+      assertFalse(session.isClosed());
+      
+      if (delivered)
+      {
+         //Simulate there being some undelivered messages
+         for (int i = 0; i < numDeliveries; i++)
+         {
+            session.delivered(i, false);
+            session.acknowledge();
+         }
+      }
+            
+      session.close();
+      
+      EasyMock.verify(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      assertTrue(session.isClosed());  
+      
+      EasyMock.reset(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      EasyMock.replay(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      //Close again should do nothing
+      
+      session.close();
+      
+      EasyMock.verify(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
+      
+      try
+      {
+         session.createQueue(new SimpleString("trtr"), new SimpleString("iuasij"), null, false, false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.deleteQueue(new SimpleString("trtr"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.addDestination(new SimpleString("trtr"), false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.removeDestination(new SimpleString("trtr"), false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.queueQuery(new SimpleString("trtr"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.bindingQuery(new SimpleString("trtr"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createConsumer(new SimpleString("trtr"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createConsumer(new SimpleString("iasjq"), null, false, false, false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createConsumer(new SimpleString("husuhsuh"), null, false, false, false, 8787, 7162761);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createBrowser(new SimpleString("husuhsuh"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createBrowser(new SimpleString("husuhsuh"), null);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createProducer(new SimpleString("husuhsuh"));
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createProducer(new SimpleString("iashi"), 878778, 8778, false, false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createRateLimitedProducer(new SimpleString("uhsuhs"), 78676);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.createProducerWithWindowSize(new SimpleString("uhsuhs"), 78676);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.commit();
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.rollback();
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         session.acknowledge();
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+   }
+   
    private void testXAStart(int flags, boolean error) throws Exception
    {
       ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
@@ -2141,6 +2158,8 @@
 
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
 
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);      
+      
       pd.register(new ClientProducerPacketHandler(null, clientTargetID));
 
       EasyMock.replay(cf);
@@ -2210,6 +2229,8 @@
 
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
 
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+            
       pd.register(new ClientProducerPacketHandler(null, clientTargetID));
 
       EasyMock.replay(cf);
@@ -2286,7 +2307,9 @@
       EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
 
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
-
+      
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
       pd.register(new ClientProducerPacketHandler(null, clientTargetID));
 
       EasyMock.replay(cf);
@@ -2346,6 +2369,8 @@
       
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
       
+      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      
       pd.register(new ClientProducerPacketHandler(null, clientTargetID));
       
       EasyMock.replay(cf);

Copied: trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterImplTest.java (from rev 4451, trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterImplTest.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -0,0 +1,130 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.tests.unit.core.util;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
+
+/**
+ * 
+ * A TokenBucketLimiterImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TokenBucketLimiterImplTest extends TestCase
+{
+	private static final Logger log = Logger.getLogger(TokenBucketLimiterImplTest.class);
+	
+	public void testRateWithSpin1() throws Exception
+	{
+		testRate(1, true);		
+	}
+	
+	public void testRateWithSpin10() throws Exception
+	{
+		testRate(10, true);		
+	}
+	
+	public void testRateWithSpin100() throws Exception
+	{
+		testRate(100, true);		
+	}
+	
+	public void testRateWithSpin1000() throws Exception
+	{
+		testRate(1000, true);		
+	}
+	
+	public void testRateWithSpin10000() throws Exception
+	{
+		testRate(10000, true);		
+	}
+	
+	public void testRateWithSpin100000() throws Exception
+	{
+		testRate(100000, true);		
+	}
+		
+	public void testRateWithoutSpin1() throws Exception
+	{
+		testRate(1, false);		
+	}
+	
+	public void testRateWithoutSpin10() throws Exception
+	{
+		testRate(10, false);		
+	}
+	
+	public void testRateWithoutSpin100() throws Exception
+	{
+		testRate(100, false);		
+	}
+	
+	public void testRateWithoutSpin1000() throws Exception
+	{
+		testRate(1000, false);		
+	}
+	
+	public void testRateWithoutSpin10000() throws Exception
+	{
+		testRate(10000, false);		
+	}
+	
+	public void testRateWithoutSpin100000() throws Exception
+	{
+		testRate(100000, false);		
+	}
+	
+	private void testRate(int rate, boolean spin) throws Exception
+	{		
+		final double error = 0.05;    //Allow for 5% error
+		
+		TokenBucketLimiterImpl tbl = new TokenBucketLimiterImpl(rate, spin);
+		
+		long start = System.currentTimeMillis();
+		
+		long count = 0;
+		
+		final long measureTime = 5000;
+		
+		while (System.currentTimeMillis() - start < measureTime)
+		{				
+			tbl.limit();
+			
+			count++;
+		}
+				
+		long end  = System.currentTimeMillis();
+		
+		double actualRate = ((double)(1000 * count)) / ( end - start);
+    
+      log.info("Desired rate: " + rate + " Actual rate " + actualRate + " invs/sec");
+      
+      assertTrue(actualRate > rate * (1 - error));
+      
+      assertTrue(actualRate < rate * (1 + error));
+		
+	}
+}

Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterTest.java	2008-06-12 18:21:54 UTC (rev 4452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/util/TokenBucketLimiterTest.java	2008-06-12 20:15:59 UTC (rev 4453)
@@ -1,130 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.tests.unit.core.util;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.util.TokenBucketLimiter;
-
-/**
- * 
- * A TokenBucketLimiterTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class TokenBucketLimiterTest extends TestCase
-{
-	private static final Logger log = Logger.getLogger(TokenBucketLimiterTest.class);
-	
-	public void testRateWithSpin1() throws Exception
-	{
-		testRate(1, true);		
-	}
-	
-	public void testRateWithSpin10() throws Exception
-	{
-		testRate(10, true);		
-	}
-	
-	public void testRateWithSpin100() throws Exception
-	{
-		testRate(100, true);		
-	}
-	
-	public void testRateWithSpin1000() throws Exception
-	{
-		testRate(1000, true);		
-	}
-	
-	public void testRateWithSpin10000() throws Exception
-	{
-		testRate(10000, true);		
-	}
-	
-	public void testRateWithSpin100000() throws Exception
-	{
-		testRate(100000, true);		
-	}
-		
-	public void testRateWithoutSpin1() throws Exception
-	{
-		testRate(1, false);		
-	}
-	
-	public void testRateWithoutSpin10() throws Exception
-	{
-		testRate(10, false);		
-	}
-	
-	public void testRateWithoutSpin100() throws Exception
-	{
-		testRate(100, false);		
-	}
-	
-	public void testRateWithoutSpin1000() throws Exception
-	{
-		testRate(1000, false);		
-	}
-	
-	public void testRateWithoutSpin10000() throws Exception
-	{
-		testRate(10000, false);		
-	}
-	
-	public void testRateWithoutSpin100000() throws Exception
-	{
-		testRate(100000, false);		
-	}
-	
-	private void testRate(int rate, boolean spin) throws Exception
-	{		
-		final double error = 0.05;    //Allow for 5% error
-		
-		TokenBucketLimiter tbl = new TokenBucketLimiter(rate, spin);
-		
-		long start = System.currentTimeMillis();
-		
-		long count = 0;
-		
-		final long measureTime = 5000;
-		
-		while (System.currentTimeMillis() - start < measureTime)
-		{				
-			tbl.limit();
-			
-			count++;
-		}
-				
-		long end  = System.currentTimeMillis();
-		
-		double actualRate = ((double)(1000 * count)) / ( end - start);
-    
-      log.info("Desired rate: " + rate + " Actual rate " + actualRate + " invs/sec");
-      
-      assertTrue(actualRate > rate * (1 - error));
-      
-      assertTrue(actualRate < rate * (1 + error));
-		
-	}
-}




More information about the jboss-cvs-commits mailing list