[hornetq-commits] JBoss hornetq SVN: r7912 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 25 11:29:32 EDT 2009


Author: timfox
Date: 2009-08-25 11:29:32 -0400 (Tue, 25 Aug 2009)
New Revision: 7912

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-111

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-08-25 15:29:32 UTC (rev 7912)
@@ -71,8 +71,6 @@
 
    private final Runner runner = new Runner();
 
-   private ClientMessageInternal currentChunkMessage;
-
    private LargeMessageBufferImpl currentLargeMessageBuffer;
 
    // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body
@@ -407,7 +405,7 @@
       // Flow control for the first packet, we will have others
       flowControl(packet.getPacketSize(), false);
             
-      currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
+      ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
 
       currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
 
@@ -441,12 +439,21 @@
       currentLargeMessageBuffer.addPacket(chunk);
    }
 
-   public void clear()
+   public void clear() throws HornetQException
    {
       synchronized (this)
       {
+         //Need to send credits for the messages in the buffer
+         
+         for (ClientMessageInternal message: this.buffer)
+         {
+            flowControlBeforeConsumption(message);
+         }
+
          buffer.clear();
       }
+      
+      //Need to send credits for the messages in the buffer
 
       waitForOnMessageToComplete();
    }
@@ -484,7 +491,7 @@
    }
 
    /** 
-    * flow control is synchornized because of LargeMessage and streaming.
+    * 
     * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
     * So, this operation needs to be atomic.
     * 
@@ -498,7 +505,6 @@
 
          if (creditsToSend >= clientWindowSize)
          {
-
             if (clientWindowSize == 0 && discountSlowConsumer)
             {
                if (trace)

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2009-08-25 15:29:32 UTC (rev 7912)
@@ -38,7 +38,7 @@
    
    void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
 
-   void clear();
+   void clear() throws HornetQException;
 
    int getClientWindowSize();
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-08-25 15:29:32 UTC (rev 7912)
@@ -1204,6 +1204,8 @@
 
       if (windowSize != 0)
       {
+         log.info("Sending " + windowSize + " initial credits");
+         
          channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
       }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-08-25 15:29:32 UTC (rev 7912)
@@ -558,6 +558,7 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
+        // log.info("busy - available credits is " + availableCredits.get());
          return HandleStatus.BUSY;
       }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java	2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java	2009-08-25 15:29:32 UTC (rev 7912)
@@ -15,6 +15,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.ObjectMessage;
+
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
@@ -64,8 +66,6 @@
       super.tearDown();
    }
 
-   
-  
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
@@ -93,10 +93,8 @@
          assertEquals("m" + i, message2.getBody().readString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -129,10 +127,8 @@
          assertEquals("m" + i, message2.getBody().readString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -169,10 +165,8 @@
          }
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -209,17 +203,13 @@
          }
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
 
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0,
-                   ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
    }
 
    public void testAcksWithSmallSendWindow() throws Exception
@@ -246,7 +236,7 @@
       {
          public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
          {
-            if(packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
+            if (packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
             {
                latch.countDown();
             }
@@ -276,7 +266,7 @@
       assertTrue(latch.await(5, TimeUnit.SECONDS));
       sessionRec.close();
    }
-   
+
    public void testClearListener() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
@@ -286,7 +276,7 @@
       session.createQueue(QUEUE, QUEUE, null, false);
 
       ClientConsumer consumer = session.createConsumer(QUEUE);
-      
+
       consumer.setMessageHandler(new MessageHandler()
       {
          public void onMessage(ClientMessage msg)
@@ -297,7 +287,7 @@
       consumer.setMessageHandler(null);
       consumer.receiveImmediate();
    }
-   
+
    public void testNoReceiveWithListener() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
@@ -307,7 +297,7 @@
       session.createQueue(QUEUE, QUEUE, null, false);
 
       ClientConsumer consumer = session.createConsumer(QUEUE);
-      
+
       consumer.setMessageHandler(new MessageHandler()
       {
          public void onMessage(ClientMessage msg)
@@ -324,7 +314,7 @@
       {
          if (me.getCode() == HornetQException.ILLEGAL_STATE)
          {
-            //Ok
+            // Ok
          }
          else
          {
@@ -333,4 +323,123 @@
       }
    }
 
+   // https://jira.jboss.org/jira/browse/HORNETQ-111
+   // Test that, on rollback credits are released for messages cleared in the buffer
+   public void testConsumerCreditsOnRollback() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      sf.setConsumerWindowSize(10000);
+
+      ClientSession session = sf.createTransactedSession();
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      final byte[] bytes = new byte[1000];
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+
+         message.getBody().writeBytes(bytes);
+
+         message.putIntProperty("count", i);
+
+         producer.send(message);
+      }
+
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      session.start();
+
+      int commited = 0;
+      int rollbacked = 0;
+      for (int i = 0; i < 110; i++)
+      {
+         ClientMessage message = (ClientMessage)consumer.receive();
+
+         int count = (Integer)message.getProperty("count");
+
+         boolean redelivered = message.getDeliveryCount() > 1;
+
+         if (count % 2 == 0 && !redelivered)
+         {
+            session.rollback();
+            rollbacked++;
+         }
+         else
+         {
+            session.commit();
+            commited++;
+         }
+      }
+
+      session.close();
+   }
+   
+   // https://jira.jboss.org/jira/browse/HORNETQ-111
+   // Test that, on rollback credits are released for messages cleared in the buffer
+   public void testConsumerCreditsOnRollbackLargeMessages() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      sf.setConsumerWindowSize(10000);
+      sf.setMinLargeMessageSize(1000);
+
+      ClientSession session = sf.createTransactedSession();
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      final byte[] bytes = new byte[10000];
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+
+         message.getBody().writeBytes(bytes);
+
+         message.putIntProperty("count", i);
+
+         producer.send(message);
+      }
+
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      session.start();
+
+      int commited = 0;
+      int rollbacked = 0;
+      for (int i = 0; i < 110; i++)
+      {
+         ClientMessage message = (ClientMessage)consumer.receive();
+
+         int count = (Integer)message.getProperty("count");
+
+         boolean redelivered = message.getDeliveryCount() > 1;
+
+         if (count % 2 == 0 && !redelivered)
+         {
+            session.rollback();
+            rollbacked++;
+         }
+         else
+         {
+            session.commit();
+            commited++;
+         }
+      }
+
+      session.close();
+   }
+
 }



More information about the hornetq-commits mailing list