[jboss-cvs] JBoss Messaging SVN: r4642 - in trunk: tests/src/org/jboss/messaging/tests/timing/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 7 08:19:16 EDT 2008


Author: timfox
Date: 2008-07-07 08:19:15 -0400 (Mon, 07 Jul 2008)
New Revision: 4642

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
More tests


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-07-07 12:19:15 UTC (rev 4642)
@@ -57,7 +57,7 @@
 
    private static final boolean trace = log.isTraceEnabled();
    
-   private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
+   public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
 
    // Attributes
    // -----------------------------------------------------------------------------------
@@ -414,7 +414,7 @@
       if (clientWindowSize > 0)
       {
          creditsToSend += messageBytes;
-   
+         
          if (creditsToSend >= clientWindowSize)
          {            
             remotingConnection.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(creditsToSend));
@@ -510,7 +510,7 @@
 		}
    }
 
-   public void doCleanUp(boolean sendCloseMessage) throws MessagingException
+   private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
    {
       if (closed)
       {
@@ -537,7 +537,7 @@
 
          receiverThread = null;
 
-         if(sendCloseMessage)
+         if (sendCloseMessage)
          {
             remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java	2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java	2008-07-07 12:19:15 UTC (rev 4642)
@@ -21,11 +21,15 @@
  */
 package org.jboss.messaging.tests.timing.core.client.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
 import org.jboss.messaging.core.client.impl.ClientConsumerImpl;
 import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
 import org.jboss.messaging.core.client.impl.ClientSessionInternal;
@@ -33,6 +37,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.tests.util.UnitTestCase;
 
 /**
@@ -69,7 +74,7 @@
          {
             try
             {
-               consumer.receive(1000);
+               consumer.receive(2000);
             }
             catch (Exception e)
             {
@@ -79,7 +84,7 @@
       
       t.start();
       
-      Thread.sleep(100);
+      Thread.sleep(1000);
       
       try
       {
@@ -96,7 +101,485 @@
          t.interrupt();
       }
    }
+   
+   public void testCloseWhileReceiving() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+             
+      final long clientTargetID = 283748;
+      final long targetID = 12934;
+      final long sessionTargetID = 23847327;
+      
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+                   
+      pd.unregister(clientTargetID);
+      session.removeConsumer(consumer);
+      EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
 
+      EasyMock.replay(session, rc, executor, pd);
+      
+      class ReceiverThread extends Thread
+      {
+         volatile boolean returned;
+         volatile ClientMessage msg;
+         volatile boolean failed;
+         public void run()
+         {
+            try
+            {
+               msg = consumer.receive();
+               returned = true;
+            }
+            catch (Exception e)
+            {
+               failed = true;
+            }
+         }
+      };
+      
+      ReceiverThread t = new ReceiverThread();
+      
+      t.start();
+      
+      Thread.sleep(2000);
+                  
+      consumer.close();
+      
+      Thread.sleep(2000);
+      
+      assertTrue(t.returned);
+      assertNull(t.msg);
+      assertFalse(t.failed);
+      
+      t.join();
+      
+      EasyMock.verify(session, rc, executor, pd);
+   }
+   
+   public void testReceiveHandleMessagesAfterReceiveNoTimeout() throws Exception
+   {
+      testReceiveHandleMessagesAfterReceive(4000);
+   }
+   
+   public void testReceiveHandleMessagesAfterReceiveTimeout() throws Exception
+   {
+      testReceiveHandleMessagesAfterReceive(0);
+   }
+   
+   private void testReceiveHandleMessagesAfterReceive(final int timeout) throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+       
+      final int numMessages = 10;
+      
+      final List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+         
+         EasyMock.expect(msg.isExpired()).andStubReturn(false);
+         
+         EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+         
+         session.delivered((long)i, false);
+         
+         EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+      }
+      
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+            
+      final long pause = 2000;
+      
+      class AdderThread extends Thread      
+      {
+         volatile boolean failed;
+         
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(pause);
+               
+               for (ClientMessage msg: msgs)
+               {
+                  consumer.handleMessage(msg);
+               }
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to add messages", e);
+               failed = true;
+            }
+         }
+      };
+      
+      AdderThread t = new AdderThread();
+      
+      t.start();
+            
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage msg;
+         
+         if (timeout == 0)
+         {
+            msg = consumer.receive();
+         }
+         else
+         {
+            msg = consumer.receive(timeout);
+         }
+   
+         assertTrue(msg == msgs.get(i));
+      }
+
+      assertNull(consumer.receiveImmediate());  
+      
+      t.join();
+      
+      assertFalse(t.failed);
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+      
+      assertEquals(0, consumer.getBufferSize());   
+   }
+   
+   public void testReceiveHandleMessagesAfterReceiveWithTimeout() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+       
+      final int numMessages = 10;
+      
+      final List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+      }
+      
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+            
+      final long pause = 2000;
+      
+      final long timeout = 1000;
+      
+      class AdderThread extends Thread      
+      {
+         volatile boolean failed;
+         
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(pause);
+               
+               for (ClientMessage msg: msgs)
+               {
+                  consumer.handleMessage(msg);
+               }
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to add messages", e);
+               failed = true;
+            }
+         }
+      };
+      
+      AdderThread t = new AdderThread();
+      
+      t.start();
+            
+      ClientMessage msg = consumer.receive(timeout);         
+        
+      assertNull(msg);  
+      
+      t.join();
+      
+      assertFalse(t.failed);
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+      
+      assertEquals(numMessages, consumer.getBufferSize());   
+   }
+   
+   public void testReceiveExpiredWithTimeout() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+            
+      final int numMessages = 10;
+      
+      List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+         
+         EasyMock.expect(msg.isExpired()).andStubReturn(true);
+         
+         EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+         
+         session.delivered((long)i, true);
+         
+         EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+      }
+      
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+      
+      for (ClientMessage msg: msgs)
+      {
+         consumer.handleMessage(msg);
+      }
+
+      assertEquals(numMessages, consumer.getBufferSize());         
+
+      for (ClientMessage msg: msgs)
+      {
+         assertNull(consumer.receive(100));
+      }
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+   }
+   
+   public void testWaitForOnMessageToCompleteOnClose() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+             
+      final long clientTargetID = 283748;
+      final long targetID = 12934;
+      final long sessionTargetID = 23847327;
+      
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+      
+      class MyHandler implements MessageHandler
+      {
+         volatile boolean failed;
+         volatile boolean complete;
+         public void onMessage(ClientMessage msg)
+         {            
+            try
+            {
+               Thread.sleep(1000);
+               complete = true;
+            }
+            catch (Exception e)
+            {         
+               failed = true;
+            }            
+         }         
+      };
+      
+      MyHandler handler = new MyHandler();
+      
+      consumer.setMessageHandler(handler);
+     
+      ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+        
+      EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+      
+      EasyMock.expect(msg.isExpired()).andStubReturn(false);
+      
+      EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+      
+      session.delivered(0L, false);
+      
+      EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+      
+      pd.unregister(clientTargetID);
+      session.removeConsumer(consumer);
+      EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+      EasyMock.replay(session, rc, pd, msg);
+             
+      consumer.handleMessage(msg);
+      
+      consumer.close();
+      
+      assertTrue(handler.complete);
+      
+      assertFalse(handler.failed);
+      
+      EasyMock.verify(session, rc, pd, msg);           
+   }
+
+   public void testWaitForOnMessageToCompleteOnCloseTimeout() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+             
+      final long clientTargetID = 283748;
+      final long targetID = 12934;
+      final long sessionTargetID = 23847327;
+      
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+      
+      class MyHandler implements MessageHandler
+      {
+         volatile boolean failed;
+         volatile boolean complete;
+         public void onMessage(ClientMessage msg)
+         {            
+            try
+            {
+               Thread.sleep(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS + 2000);
+               complete = true;
+            }
+            catch (Exception e)
+            {         
+               failed = true;
+            }            
+         }         
+      };
+      
+      MyHandler handler = new MyHandler();
+      
+      consumer.setMessageHandler(handler);
+     
+      ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+        
+      EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+      
+      EasyMock.expect(msg.isExpired()).andStubReturn(false);
+      
+      EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+      
+      session.delivered(0L, false);
+      
+      EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+      
+      pd.unregister(clientTargetID);
+      session.removeConsumer(consumer);
+      EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+      EasyMock.replay(session, rc, pd, msg);
+             
+      consumer.handleMessage(msg);
+      
+      long start = System.currentTimeMillis();
+      consumer.close();
+      long end = System.currentTimeMillis();
+      assertTrue((end - start) >= ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
+      
+      assertFalse(handler.complete);
+      
+      assertFalse(handler.failed);
+      
+      EasyMock.verify(session, rc, pd, msg);           
+   }
+   
+   public void testWaitForOnMessageToCompleteOnCloseSameThread() throws Exception
+   {
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = new DirectExecutorService();
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);  
+             
+      final long clientTargetID = 283748;
+      final long targetID = 12934;
+      final long sessionTargetID = 23847327;
+      
+      final ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+      
+      class MyHandler implements MessageHandler
+      {
+         volatile boolean failed;
+         volatile boolean complete;
+         public void onMessage(ClientMessage msg)
+         {            
+            try
+            {
+               Thread.sleep(1000);
+               complete = true;
+            }
+            catch (Exception e)
+            {         
+               failed = true;
+            }            
+         }         
+      };
+      
+      MyHandler handler = new MyHandler();
+      
+      consumer.setMessageHandler(handler);
+     
+      ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+        
+      EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+      
+      EasyMock.expect(msg.isExpired()).andStubReturn(false);
+      
+      EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+      
+      session.delivered(0L, false);
+      
+      EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+      
+      pd.unregister(clientTargetID);
+      session.removeConsumer(consumer);
+      EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+      EasyMock.replay(session, rc, pd, msg);
+             
+      consumer.handleMessage(msg);
+      
+      consumer.close();
+      
+      assertTrue(handler.complete);
+      
+      assertFalse(handler.failed);
+      
+      EasyMock.verify(session, rc, pd, msg);           
+   }
+   
    // Private -----------------------------------------------------------------------------------------------------------
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java	2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java	2008-07-07 12:19:15 UTC (rev 4642)
@@ -24,9 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -39,6 +37,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.tests.util.UnitTestCase;
 
@@ -46,12 +45,6 @@
  * 
  * A ClientConsumerImplTest
  * 
- * TODO - still need to test:
- * priority
- * flow control
- * closing
- * waiting for message listener to complete etc
- * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -104,40 +97,7 @@
       
       assertEquals(numMessages, consumer.getBufferSize());         
    }
-   
-   private class DirectExecutorService extends AbstractExecutorService
-   {
-      public boolean awaitTermination(long timeout, TimeUnit unit)
-            throws InterruptedException
-      {
-         return false;
-      }
-
-      public boolean isShutdown()
-      {
-         return false;
-      }
-
-      public void shutdown()
-      { 
-      }
-
-      public boolean isTerminated()
-      {
-         return false;
-      }
-
-      public List<Runnable> shutdownNow()
-      {
-         return null;
-      }
-
-      public void execute(Runnable command)
-      {
-         command.run();
-      }
-   }
-   
+         
    public void testHandleMessageWithNonDirectHandler() throws Exception
    {
       ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
@@ -459,7 +419,7 @@
       EasyMock.verify(msgs.toArray());
    }
    
-   public void testReceiveExpired() throws Exception
+   public void testReceiveExpiredImmediate() throws Exception
    {
       ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
       ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
@@ -510,6 +470,8 @@
       EasyMock.verify(msgs.toArray());
    }
    
+
+   
    public void testReceiveWithHandler() throws Exception
    {
       ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
@@ -712,6 +674,186 @@
       assertEquals(0, consumer.getBufferSize());
    }
    
+   public void testFlowControlExact() throws Exception
+   {      
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+            
+      final int clientWindowSize = 500;
+      
+      final int numMessages = 10;
+      
+      final int msgSize = 100;
+      
+      List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+         
+         EasyMock.expect(msg.isExpired()).andStubReturn(true);
+         
+         EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+         
+         session.delivered((long)i, true);
+         
+         EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+      }
+      
+      final long targetID = 120912;
+      final long sessionTargetID = 12348;
+      
+      rc.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(clientWindowSize));
+      EasyMock.expectLastCall().times(2);
+      
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+      
+      for (ClientMessage msg: msgs)
+      {
+         consumer.handleMessage(msg);
+      }
+
+      assertEquals(numMessages, consumer.getBufferSize());         
+
+      for (ClientMessage msg: msgs)
+      {
+         assertNull(consumer.receiveImmediate());
+      }
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+   }
+   
+   public void testFlowControlInExact() throws Exception
+   {      
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+            
+      final int clientWindowSize = 500;
+      
+      final int numMessages = 10;
+      
+      final int msgSize = 101;
+      
+      List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+         
+         EasyMock.expect(msg.isExpired()).andStubReturn(true);
+         
+         EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+         
+         session.delivered((long)i, true);
+         
+         EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+      }
+      
+      final long targetID = 120912;
+      final long sessionTargetID = 12348;
+      
+      rc.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(505));
+      EasyMock.expectLastCall().times(2);
+      
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+      
+      for (ClientMessage msg: msgs)
+      {
+         consumer.handleMessage(msg);
+      }
+
+      assertEquals(numMessages, consumer.getBufferSize());         
+
+      for (ClientMessage msg: msgs)
+      {
+         assertNull(consumer.receiveImmediate());
+      }
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+   }
+   
+   public void testFlowControlDisabled() throws Exception
+   {      
+      ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+      ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+      ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+      PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+            
+      final int clientWindowSize = -1;
+      
+      final int numMessages = 10;
+      
+      final int msgSize = 100;
+      
+      List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+         
+         msgs.add(msg);
+         
+         EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+         
+         EasyMock.expect(msg.isExpired()).andStubReturn(true);
+         
+         EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+         
+         session.delivered((long)i, true);
+         
+         EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+      }
+      
+      final long targetID = 120912;
+      final long sessionTargetID = 12348;
+       
+      EasyMock.replay(session, connection, rc, executor, pd);
+      EasyMock.replay(msgs.toArray());
+            
+      ClientConsumerInternal consumer =
+         new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+      
+      for (ClientMessage msg: msgs)
+      {
+         consumer.handleMessage(msg);
+      }
+
+      assertEquals(numMessages, consumer.getBufferSize());         
+
+      for (ClientMessage msg: msgs)
+      {
+         assertNull(consumer.receiveImmediate());
+      }
+      
+      EasyMock.verify(session, connection, rc, executor, pd);
+      EasyMock.verify(msgs.toArray());
+   }
+   
    // Private -----------------------------------------------------------------------------------------------------------
    
    private void testConstructor(final long targetID, final long clientTargetID,

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-07-07 12:19:15 UTC (rev 4642)
@@ -33,6 +33,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.Xid;
 
@@ -258,6 +260,39 @@
       
       return null;
    }
+   
+   public static class DirectExecutorService extends AbstractExecutorService
+   {
+      public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException
+      {
+         return false;
+      }
 
+      public boolean isShutdown()
+      {
+         return false;
+      }
+
+      public void shutdown()
+      { 
+      }
+
+      public boolean isTerminated()
+      {
+         return false;
+      }
+
+      public List<Runnable> shutdownNow()
+      {
+         return null;
+      }
+
+      public void execute(Runnable command)
+      {
+         command.run();
+      }
+   }
+
    
 }




More information about the jboss-cvs-commits mailing list