[hornetq-commits] JBoss hornetq SVN: r10784 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 7 15:36:03 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-07 15:36:02 -0400 (Tue, 07 Jun 2011)
New Revision: 10784

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding non-persistence page transactions

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -53,9 +53,9 @@
    // To be used after the update was stored or reload
    void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
 
-   void increment();
+   void increment(boolean persistent);
    
-   void increment(int size);
+   void increment(int durableSize, int nonDurableSize);
 
    int getNumberOfMessages();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -59,6 +59,8 @@
 
    private AtomicInteger numberOfMessages = new AtomicInteger(0);
    
+   private AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
+   
    private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
 
    // Static --------------------------------------------------------
@@ -110,14 +112,19 @@
       }
    }
 
-   public void increment()
+   public void increment(final boolean persistent)
    {
+      if (persistent)
+      {
+         numberOfPersistentMessages.incrementAndGet();
+      }
       numberOfMessages.incrementAndGet();
    }
    
-   public void increment(final int size)
+   public void increment(final int durableSize, final int nonDurableSize)
    {
-      numberOfMessages.addAndGet(size);
+      numberOfPersistentMessages.addAndGet(durableSize);
+      numberOfMessages.addAndGet(durableSize + nonDurableSize);
    }
 
    public int getNumberOfMessages()
@@ -131,13 +138,14 @@
    {
       transactionID = buffer.readLong();
       numberOfMessages.set(buffer.readInt());
+      numberOfPersistentMessages.set(numberOfMessages.get());
       committed = true;
    }
 
    public synchronized void encode(final HornetQBuffer buffer)
    {
       buffer.writeLong(transactionID);
-      buffer.writeInt(numberOfMessages.get());
+      buffer.writeInt(numberOfPersistentMessages.get());
    }
 
    public synchronized int getEncodeSize()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -938,7 +938,7 @@
       }
 
       pgOper.addStore(this);
-      pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
+      pgOper.pageTransaction.increment(listCtx.getNumberOfDurableQueues(), listCtx.getNumberOfNonDurableQueues());
 
       return;
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -25,8 +25,10 @@
 public interface RouteContextList
 {
    
-   int getNumberOfQueues();
+   int getNumberOfNonDurableQueues();
 
+   int getNumberOfDurableQueues();
+
    List<Queue> getDurableQueues();
    
    List<Queue> getNonDurableQueues();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -125,10 +125,15 @@
       
       private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
       
-      public int getNumberOfQueues()
+      public int getNumberOfDurableQueues()
       {
-         return durableQueue.size() + nonDurableQueue.size();
+         return durableQueue.size();
       }
+      
+      public int getNumberOfNonDurableQueues()
+      {
+         return nonDurableQueue.size();
+      }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.server.RouteContextList#getDurableQueues()

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -419,7 +419,7 @@
          }
          session.commit();
          session.close();
-         
+
          session = null;
 
          sf.close();
@@ -459,16 +459,16 @@
                fail("Didn't receive a message");
             }
             msg.acknowledge();
-            
+
             if (msgCount % 5 == 0)
             {
                log.info("commit");
                sessionConsumer.commit();
             }
          }
-         
+
          sessionConsumer.commit();
-         
+
          sessionConsumer.close();
 
          sf.close();
@@ -1205,6 +1205,189 @@
 
    }
 
+   public void testMultiQueuesNonPersistentAndPersistent() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 3000;
+
+      final byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++)
+      {
+         bb.put(getSamplebyte(j));
+      }
+
+      try
+      {
+         {
+            ServerLocator locator = createInVMNonHALocator();
+
+            locator.setBlockOnNonDurableSend(true);
+            locator.setBlockOnDurableSend(true);
+            locator.setBlockOnAcknowledge(true);
+
+            ClientSessionFactory sf = locator.createSessionFactory();
+
+            ClientSession session = sf.createSession(false, false, false);
+
+            session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-1", null, true);
+
+            session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, false);
+
+            ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+            ClientMessage message = null;
+
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               if (i % 500 == 0)
+               {
+                  session.commit();
+               }
+               message = session.createMessage(true);
+
+               HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+               bodyLocal.writeBytes(body);
+
+               message.putIntProperty(new SimpleString("id"), i);
+
+               producer.send(message);
+            }
+
+            session.commit();
+
+            session.close();
+
+            server.stop();
+
+            sf.close();
+            locator.close();
+         }
+         
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         ServerLocator locator = createInVMNonHALocator();
+         final ClientSessionFactory sf2 = locator.createSessionFactory();
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         Thread t = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+
+                  ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS + "-1");
+
+                  session.start();
+
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+                     Assert.assertNotNull(message2);
+
+                     Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+
+                     message2.acknowledge();
+
+                     Assert.assertNotNull(message2);
+
+                     if (i % 1000 == 0)
+                        session.commit();
+
+                     try
+                     {
+                        assertBodiesEqual(body, message2.getBodyBuffer());
+                     }
+                     catch (AssertionFailedError e)
+                     {
+                        PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+                        PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+                                                                                                   .toByteBuffer()
+                                                                                                   .array(), 40));
+                        throw e;
+                     }
+                  }
+
+                  session.commit();
+
+                  consumer.close();
+
+                  session.close();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         };
+
+         t.start();
+         t.join();
+
+         
+         assertEquals(0, errors.get());
+
+         for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
+         {
+            // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+            Thread.sleep(500);
+         }
+         
+         assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+
+
+         for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+         {
+            // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+            Thread.sleep(500);
+         }
+
+         assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
+         
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
    {
 
@@ -1915,7 +2098,7 @@
          }
       }
    }
-   
+
    public void testOrderingNonTX() throws Exception
    {
       clearData();
@@ -2024,7 +2207,7 @@
             {
                log.info("###### different");
             }
-            //assertEquals(i, msg.getIntProperty("count").intValue());
+            // assertEquals(i, msg.getIntProperty("count").intValue());
             msg.acknowledge();
          }
 
@@ -2953,7 +3136,7 @@
          catch (Throwable ignored)
          {
          }
-         
+
          OperationContextImpl.clearContext();
       }
 
@@ -3676,7 +3859,7 @@
          }
       }
    }
-   
+
    public void testDLAOnLargeMessageAndPaging() throws Exception
    {
       clearData();
@@ -3723,18 +3906,17 @@
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
-
          for (int i = 0; i < 100; i++)
          {
             log.debug("send message #" + i);
             ClientMessage message = session.createMessage(true);
 
             message.putStringProperty("id", "str" + i);
-            
+
             message.setBodyInputStream(createFakeLargeStream(messageSize));
 
             producer.send(message);
-            
+
             if ((i + 1) % 2 == 0)
             {
                session.commit();
@@ -3746,27 +3928,27 @@
          session.start();
 
          ClientConsumer cons = session.createConsumer(ADDRESS);
-         
-         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+
+         for (int msgNr = 0; msgNr < 2; msgNr++)
          {
-            for (int i = 0 ; i < 5; i++)
+            for (int i = 0; i < 5; i++)
             {
                ClientMessage msg = cons.receive(5000);
-      
+
                assertNotNull(msg);
-      
+
                msg.acknowledge();
-      
+
                assertEquals("str" + msgNr, msg.getStringProperty("id"));
-   
+
                for (int j = 0; j < messageSize; j++)
                {
                   assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
                }
-   
+
                session.rollback();
             }
-            
+
             pgStoreDLA.startPaging();
          }
 
@@ -3776,9 +3958,9 @@
             ClientMessage message = cons.receive(5000);
             assertNotNull("Message " + i + " wasn't received", message);
             message.acknowledge();
-            
+
             final AtomicInteger bytesOutput = new AtomicInteger(0);
-            
+
             message.setOutputStream(new OutputStream()
             {
                @Override
@@ -3800,41 +3982,42 @@
             {
                log.info("output bytes = " + bytesOutput);
                log.info(threadDump("dump"));
-               fail("Couldn't finish large message receiving for id=" + 
-                    message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+               fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") +
+                    " with messageID=" +
+                    message.getMessageID());
             }
 
          }
-         
+
          assertNull(cons.receiveImmediate());
 
          cons.close();
-         
+
          cons = session.createConsumer("DLA");
-         
-         for (int i = 0 ; i < 2; i++)
+
+         for (int i = 0; i < 2; i++)
          {
             assertNotNull(cons.receive(5000));
          }
-         
+
          sf.close();
-         
+
          session.close();
-         
+
          locator.close();
-         
+
          server.stop();
-         
+
          server.start();
-         
+
          locator = createInVMNonHALocator();
-         
+
          sf = locator.createSessionFactory();
-         
+
          session = sf.createSession(false, false);
-         
+
          session.start();
-         
+
          cons = session.createConsumer(ADDRESS);
 
          for (int i = 2; i < 100; i++)
@@ -3842,7 +4025,7 @@
             log.debug("Received message " + i);
             ClientMessage message = cons.receive(5000);
             assertNotNull(message);
-            
+
             assertEquals("str" + i, message.getStringProperty("id"));
 
             message.acknowledge();
@@ -3855,53 +4038,53 @@
 
                }
             });
-            
+
             assertTrue(message.waitOutputStreamCompletion(5000));
          }
-         
+
          assertNull(cons.receiveImmediate());
-         
+
          cons.close();
-         
+
          cons = session.createConsumer("DLA");
 
-         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+         for (int msgNr = 0; msgNr < 2; msgNr++)
          {
             ClientMessage msg = cons.receive(10000);
 
             assertNotNull(msg);
-            
+
             assertEquals("str" + msgNr, msg.getStringProperty("id"));
 
             for (int i = 0; i < messageSize; i++)
             {
                assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
             }
-   
+
             msg.acknowledge();
          }
-         
+
          cons.close();
-         
+
          cons = session.createConsumer(ADDRESS);
-         
+
          session.commit();
-         
+
          assertNull(cons.receiveImmediate());
-         
+
          long timeout = System.currentTimeMillis() + 5000;
-         
+
          pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-         
+
          pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
-         
+
          pgStoreAddress.getCursorProvier().cleanup();
-         
+
          while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
          {
             Thread.sleep(50);
          }
-         
+
          assertFalse(pgStoreAddress.isPaging());
 
          session.commit();
@@ -3969,11 +4152,12 @@
 
          for (int i = 0; i < 500; i++)
          {
-            if (i % 100 == 0) log.info("send message #" + i);
+            if (i % 100 == 0)
+               log.info("send message #" + i);
             message = session.createMessage(true);
 
             message.putStringProperty("id", "str" + i);
-            
+
             message.setExpiration(System.currentTimeMillis() + 2000);
 
             if (i % 2 == 0)
@@ -3983,7 +4167,7 @@
             else
             {
                byte bytes[] = new byte[messageSize];
-               for (int s = 0 ; s < bytes.length; s++)
+               for (int s = 0; s < bytes.length; s++)
                {
                   bytes[s] = getSamplebyte(s);
                }
@@ -3991,7 +4175,7 @@
             }
 
             producer.send(message);
-            
+
             if ((i + 1) % 2 == 0)
             {
                session.commit();
@@ -4003,30 +4187,29 @@
          }
 
          session.commit();
-         
+
          sf.close();
-         
+
          locator.close();
-         
+
          server.stop();
-         
+
          Thread.sleep(3000);
-         
+
          server.start();
-         
+
          locator = createInVMNonHALocator();
-         
+
          sf = locator.createSessionFactory();
-         
+
          session = sf.createSession(false, false);
-         
+
          session.start();
-         
+
          ClientConsumer consAddr = session.createConsumer(ADDRESS);
-         
+
          assertNull(consAddr.receive(1000));
-         
-         
+
          ClientConsumer cons = session.createConsumer("DLA");
 
          for (int i = 0; i < 500; i++)
@@ -4045,22 +4228,22 @@
                }
             });
          }
-         
+
          assertNull(cons.receiveImmediate());
-         
+
          session.commit();
-         
+
          cons.close();
-         
+
          long timeout = System.currentTimeMillis() + 5000;
-         
+
          pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-         
+
          while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
          {
             Thread.sleep(50);
          }
-         
+
          assertFalse(pgStoreAddress.isPaging());
 
          session.close();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-06-07 19:36:02 UTC (rev 10784)
@@ -113,7 +113,7 @@
 
       for (int i = 0; i < nr1; i++)
       {
-         trans.increment();
+         trans.increment(true);
       }
 
       Assert.assertEquals(nr1, trans.getNumberOfMessages());



More information about the hornetq-commits mailing list