[jboss-cvs] JBoss Messaging SVN: r6613 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 28 23:55:48 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-28 23:55:48 -0400 (Tue, 28 Apr 2009)
New Revision: 6613

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
Fixing ConsumerWindowSizeTest (LargeMessage & no-buffering (slow consumer))

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-29 01:16:13 UTC (rev 6612)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-29 03:55:48 UTC (rev 6613)
@@ -483,7 +483,12 @@
       }
    }
 
-   public void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
+   /** 
+    * flow control is synchornized because of LargeMessage and streaming.
+    * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
+    * So, this operation needs to be atomic.
+    * */
+   public synchronized void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
    {
       if (clientWindowSize >= 0)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 01:16:13 UTC (rev 6612)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 03:55:48 UTC (rev 6613)
@@ -47,13 +47,11 @@
    private final SimpleString queueA = new SimpleString("queueA");
 
    private final int TIMEOUT = 5;
-   
+
    private static final Logger log = Logger.getLogger(ConsumerWindowSizeTest.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
-
-
    /*
    * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
    * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -306,7 +304,6 @@
       internalTestSlowConsumerNoBuffer2(false);
    }
 
-   
    public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
    {
       internalTestSlowConsumerNoBuffer2(true);
@@ -391,17 +388,16 @@
          }
 
          session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
-                           // the getMessageCount would fail
+         // the getMessageCount would fail
          session2.close();
 
          session1 = sf.createSession(false, true, true);
          session1.start();
          session2 = sf.createSession(false, true, true);
          session2.start();
-         
+
          prod = session1.createProducer(ADDRESS);
 
-
          assertEquals(0, getMessageCount(server, ADDRESS.toString()));
 
          // This should also work the other way around
@@ -489,7 +485,7 @@
    {
       internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
    }
-   
+
    public void testSlowConsumerOnMessageHandlerNoBuffersLargeMessage() throws Exception
    {
       internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
@@ -497,7 +493,7 @@
 
    public void internalTestSlowConsumerOnMessageHandlerNoBuffers(boolean largeMessages) throws Exception
    {
-      
+
       MessagingServer server = createServer(false);
 
       ClientSession sessionB = null;
@@ -517,7 +513,6 @@
             sf.setMinLargeMessageSize(100);
          }
 
-
          session = sf.createSession(false, true, true);
 
          SimpleString ADDRESS = new SimpleString("some-queue");
@@ -752,14 +747,13 @@
          consReceiveOneAndHold.setMessageHandler(handler);
 
          assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-         
-         
+
          long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
          while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
          {
             Thread.sleep(10);
          }
-         
+
          assertEquals(1, consReceiveOneAndHold.getBufferSize());
 
          ClientConsumer cons1 = session.createConsumer(ADDRESS);




More information about the jboss-cvs-commits mailing list