[hornetq-commits] JBoss hornetq SVN: r9272 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 27 08:43:14 EDT 2010


Author: ataylor
Date: 2010-05-27 08:43:14 -0400 (Thu, 27 May 2010)
New Revision: 9272

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-385 - reset window size for slow consumers when forcing redelivery

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-05-27 07:46:08 UTC (rev 9271)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-05-27 12:43:14 UTC (rev 9272)
@@ -264,7 +264,11 @@
                   if (seq >= forceDeliveryCount.longValue())
                   {
                      // forced delivery messages are discarded, nothing has been delivered by the queue
-                     return null;
+                     if (forcingDelivery)
+                     {
+                        resetIfSlowConsumer();
+                        return null;
+                     }
                   }
                   else
                   {
@@ -307,6 +311,7 @@
             }
             else
             {
+               resetIfSlowConsumer();
                return null;
             }
          }
@@ -703,6 +708,15 @@
       }
    }
 
+   private void resetIfSlowConsumer()
+   {
+      if(clientWindowSize == 0)
+      {
+         slowConsumerInitialCreditSent = false;
+         sendCredits(0);
+      }
+   }
+
    private void requeueExecutors()
    {
       for (int i = 0; i < buffer.size(); i++)

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-05-27 07:46:08 UTC (rev 9271)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-05-27 12:43:14 UTC (rev 9272)
@@ -460,6 +460,11 @@
          // No flow control
          availableCredits = null;
       }
+      else if(credits == 0)
+      {
+         //reset, used on slow consumers
+         availableCredits.set(0);
+      }
       else
       {
          int previous = availableCredits.getAndAdd(credits);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2010-05-27 07:46:08 UTC (rev 9271)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2010-05-27 12:43:14 UTC (rev 9272)
@@ -73,7 +73,7 @@
 
    
    // https://jira.jboss.org/jira/browse/HORNETQ-385
-   public void disabled_testReceiveImmediateWithZeroWindow() throws Exception
+   public void testReceiveImmediateWithZeroWindow() throws Exception
    {
       HornetQServer server = createServer(false, isNetty());
       try
@@ -110,7 +110,7 @@
 
          ClientMessage sent = senderSession.createMessage(true);
          sent.putStringProperty("hello", "world");
-
+         System.out.println("sending message");
          producer.send(sent);
 
          senderSession.commit();
@@ -137,7 +137,7 @@
    }
    
    // https://jira.jboss.org/jira/browse/HORNETQ-385
-   public void disabled_testReceiveImmediateWithZeroWindow2() throws Exception
+   public void testReceiveImmediateWithZeroWindow2() throws Exception
    {
       HornetQServer server = createServer(true);
 
@@ -175,7 +175,7 @@
          prod.send(msg);
          sessionProd.commit();
 
-         message = consumer.receive(1000);
+         message = consumer.receive(10000);
          assertNotNull(message);
          System.out.println(message.getStringProperty("hello"));
          message.acknowledge();
@@ -191,7 +191,7 @@
    }
    
    // https://jira.jboss.org/jira/browse/HORNETQ-385
-   public void disabled_testReceiveImmediateWithZeroWindow3() throws Exception
+   public void testReceiveImmediateWithZeroWindow3() throws Exception
    {
       HornetQServer server = createServer(false, isNetty());
       try
@@ -254,7 +254,7 @@
 
    }
    
-   public void disabled_testReceiveImmediateWithZeroWindow4() throws Exception
+   public void testReceiveImmediateWithZeroWindow4() throws Exception
    {
       HornetQServer server = createServer(false, isNetty());
       try



More information about the hornetq-commits mailing list