[hornetq-commits] JBoss hornetq SVN: r9657 - in branches/Branch_2_1: src/main/org/hornetq/core/transaction and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 8 15:40:35 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-08 15:40:33 -0400 (Wed, 08 Sep 2010)
New Revision: 9657

Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java
   branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-502 Fixing out of order during paging and transactions

Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-08 19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-08 19:40:33 UTC (rev 9657)
@@ -613,10 +613,18 @@
       {
          Transaction tx = context.getTransaction();
          boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
-         if (!depage && message.storeIsPaging())
+         
+         // if the TX paged at least one message on a give address, all the other addresses should also go towards paging cache now 
+         boolean alreadyPaging = false;
+         
+         if (tx.isPaging())
          {
-            
+            alreadyPaging = getPageOperation(tx).isPaging(message.getAddress()); 
+         }
+         
+         if (!depage && message.storeIsPaging() || alreadyPaging)
+         {
+            tx.setPaging(true);
             getPageOperation(tx).addMessageToPage(message);
             if (startedTx)
             {
@@ -1104,12 +1112,20 @@
    {
       private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
       
+      private final HashSet<SimpleString> addressesPaging = new HashSet<SimpleString>();
+      
       private Transaction subTX = null;
       
       void addMessageToPage(final ServerMessage message)
       {
          messagesToPage.add(message);
+         addressesPaging.add(message.getAddress());
       }
+      
+      boolean isPaging(final SimpleString address)
+      {
+         return addressesPaging.contains(address);
+      }
 
       public void afterCommit(final Transaction tx)
       {

Modified: branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java	2010-09-08 19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java	2010-09-08 19:40:33 UTC (rev 9657)
@@ -60,6 +60,12 @@
    void removeOperation(TransactionOperation sync);
 
    boolean hasTimedOut(long currentTime, int defaultTimeout);
+   
+   /** We don't want to look on operations at every send, so we keep the paging attribute and will only look at 
+    *  the PagingOperation case this attribute is true*/
+   boolean isPaging();
+   
+   void setPaging(boolean paging);
 
    void putProperty(int index, Object property);
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-09-08 19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-09-08 19:40:33 UTC (rev 9657)
@@ -47,6 +47,8 @@
    private final Xid xid;
 
    private final long id;
+   
+   private boolean paging = false;
 
    private volatile State state = State.ACTIVE;
 
@@ -352,7 +354,17 @@
    {
       this.state = state;
    }
+   
+   public boolean isPaging()
+   {
+      return paging;
+   }
 
+   public void setPaging(boolean paging)
+   {
+      this.paging = paging;
+   }
+
    public Xid getXid()
    {
       return xid;

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-08 19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-08 19:40:33 UTC (rev 9657)
@@ -685,6 +685,133 @@
 
    }
 
+   public void testDepageDuringTransaction3() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024; // 1k
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonDurableSend(true);
+         sf.setBlockOnDurableSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         byte[] body = new byte[messageSize];
+
+         ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+         ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
+
+         ClientSession sessionNonTX = sf.createSession(true, true, 0);
+         sessionNonTX.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producerNonTransacted = sessionNonTX.createProducer(PagingTest.ADDRESS);
+
+         sessionNonTX.start();
+
+         for (int i = 0; i < 50; i++)
+         {
+            System.out.println("Sending " + i);
+            ClientMessage message = sessionNonTX.createMessage(true);
+            message.getBodyBuffer().writeBytes(body);
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producerTransacted.send(message);
+            
+            if (i % 2 == 0)
+            {
+               System.out.println("Sending 20 msgs to make it page");
+               for (int j = 0 ; j < 20; j++)
+               {
+                  ClientMessage msgSend = sessionNonTX.createMessage(true);
+                  msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+                  producerNonTransacted.send(msgSend);
+               }
+               assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+            }
+            else
+            {
+               System.out.println("Consuming 20 msgs to make it page");
+               ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+               for (int j = 0 ; j < 20; j++)
+               {
+                  ClientMessage msgReceived = consumer.receive(10000);
+                  assertNotNull(msgReceived);
+                  msgReceived.acknowledge();
+               }
+               consumer.close();
+            }
+         }
+         
+         ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+         while (true)
+         {
+            ClientMessage msgReceived = consumerNonTX.receive(1000);
+            if (msgReceived == null)
+            {
+               break;
+            }
+            msgReceived.acknowledge();
+         }
+         consumerNonTX.close();
+         
+
+         ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+
+         Assert.assertNull(consumer.receiveImmediate());
+
+         sessionTransacted.commit();
+
+         sessionTransacted.close();
+
+         for (int i = 0; i < 50; i++)
+         {
+            ClientMessage message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(message);
+
+            Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+
+            // System.out.println(messageID);
+            Assert.assertNotNull(messageID);
+            Assert.assertEquals("message received out of order", i, messageID.intValue());
+            
+            System.out.println("MessageID = " + messageID);
+
+            message.acknowledge();
+         }
+
+         Assert.assertNull(consumer.receiveImmediate());
+
+         consumer.close();
+
+         sessionNonTX.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testPageOnSchedulingNoRestart() throws Exception
    {
       internalTestPageOnScheduling(false);

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-09-08 19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-09-08 19:40:33 UTC (rev 9657)
@@ -321,6 +321,24 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.Transaction#isPaging()
+       */
+      public boolean isPaging()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+       */
+      public void setPaging(boolean paging)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeMessage implements ServerMessage



More information about the hornetq-commits mailing list