[hornetq-commits] JBoss hornetq SVN: r10528 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Apr 18 23:31:49 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-18 23:31:49 -0400 (Mon, 18 Apr 2011)
New Revision: 10528

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-6237 - Fixing Large Message over Live Cache

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2011-04-18 16:52:01 UTC (rev 10527)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2011-04-19 03:31:49 UTC (rev 10528)
@@ -19,6 +19,7 @@
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.server.LargeServerMessage;
 
 /**
  * This is the same as PageCache, however this is for the page that's being currently written.
@@ -132,6 +133,10 @@
     */
    public synchronized void addLiveMessage(PagedMessage message)
    {
+      if (message.getMessage().isLargeMessage())
+      {
+         ((LargeServerMessage)message.getMessage()).incrementDelayDeletionCount();
+      }
       this.messages.add(message);
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-04-18 16:52:01 UTC (rev 10527)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-04-19 03:31:49 UTC (rev 10528)
@@ -17,6 +17,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -231,8 +232,8 @@
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  // PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
-                  // " now since it's the current page");
+                   PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
+                   " now since it's the current page");
                }
                else
                {
@@ -846,7 +847,7 @@
       private final long pageId;
 
       // Confirmed ACKs on this page
-      private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
+      private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet<PagePosition>());
 
       private WeakReference<PageCache> cache;
 
@@ -934,8 +935,6 @@
 
       public void addACK(final PagePosition posACK)
       {
-         removedReferences.add(posACK);
-         acks.add(posACK);
 
          if (isTrace)
          {
@@ -944,11 +943,14 @@
                     (confirmed.get() + 1) +
                     " pendingTX = " + pendingTX +
                     ", page = " +
-                    pageId);
+                    pageId + " posACK = " + posACK);
          }
 
+         removedReferences.add(posACK);
+         boolean added = acks.add(posACK);
+
          // Negative could mean a bookmark on the first element for the page (example -1)
-         if (posACK.getMessageNr() >= 0)
+         if (added && posACK.getMessageNr() >= 0)
          {
             confirmed.incrementAndGet();
             checkDone();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-04-18 16:52:01 UTC (rev 10527)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-04-19 03:31:49 UTC (rev 10528)
@@ -2450,6 +2450,7 @@
       }
    }
    
+   // JBPAPP-6237
    public void testPageOnLargeMessageMultipleQueues() throws Exception
    {
       Configuration config = createDefaultConfig(isNetty());
@@ -2602,6 +2603,144 @@
 
    }
 
+
+   // JBPAPP-6237
+   public void testPageOnLargeMessageMultipleQueues2() throws Exception
+   {
+      Configuration config = createDefaultConfig(isNetty());
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
+
+      AddressSettings value = new AddressSettings();
+      map.put(LargeMessageTest.ADDRESS.toString(), value);
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      final int numberOfBytes = 1024;
+
+      final int numberOfBytesBigMessage = 400000;
+
+      try
+      {
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+         locator.setCompressLargeMessage(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+         int msgId = 0;
+
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+            
+            message.putIntProperty("msgID", msgId++);
+
+            message.putBooleanProperty("TestLarge", false);
+
+            message.getBodyBuffer().writerIndex(0);
+
+            message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+            for (int j = 1; j <= numberOfBytes; j++)
+            {
+               message.getBodyBuffer().writeInt(j);
+            }
+
+            producer.send(message);
+         }
+
+
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+            clientFile.putBooleanProperty("TestLarge", true);
+            producer.send(clientFile);
+         }
+
+         session.close();
+
+         for (int ad = 0; ad < 2; ad++)
+         {
+            session = sf.createSession(false, false, false);
+
+            ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+            session.start();
+            
+            for (int received = 0 ; received < 5; received++)
+            {
+               for (int i = 0; i < 100; i++)
+               {
+                  ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+   
+                  Assert.assertNotNull(message2);
+                  
+                  assertFalse(message2.getBooleanProperty("TestLarge"));
+   
+                  message2.acknowledge();
+   
+                  Assert.assertNotNull(message2);
+               }
+   
+               for (int i = 0; i < 10; i++)
+               {
+                  ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+   
+                  Assert.assertNotNull(messageLarge);
+                  
+                  assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+   
+                  ByteArrayOutputStream bout = new ByteArrayOutputStream();
+                  
+                  messageLarge.acknowledge();
+                  
+                  messageLarge.saveToOutputStream(bout);
+                  byte[] body = bout.toByteArray();
+                  assertEquals(numberOfBytesBigMessage, body.length);
+                  for (int bi = 0; bi < body.length; bi++)
+                  {
+                     assertEquals(getSamplebyte(bi), body[bi]);
+                  }
+               }
+               
+               session.rollback();
+            }
+
+            session.commit();
+
+            consumer.close();
+
+            session.close();
+
+         }
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testSendStreamingSingleMessage() throws Exception
    {
       ClientSession session = null;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-04-18 16:52:01 UTC (rev 10527)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-04-19 03:31:49 UTC (rev 10528)
@@ -2934,7 +2934,260 @@
    }
 
 
+   public void testTwoQueuesDifferentFilters() throws Exception
+   {
+      boolean persistentMessages = true;
 
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 200;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+         
+         locator.setClientFailureCheckPeriod(120000);
+         locator.setConnectionTTL(5000000);
+         locator.setCallTimeout(120000);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+         
+         // note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES
+         int NQUEUES = 2;
+         
+
+         for (int i = 0 ; i < NQUEUES; i++)
+         {
+            session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true);
+         }
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty("propTest", i % NQUEUES);
+            message.putIntProperty("id", i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
+         {
+            ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
+   
+            for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+            {
+               message = consumer.receive(500000);
+               assertNotNull(message);
+               message.acknowledge();
+   
+               assertEquals(nqueue, message.getIntProperty("propTest").intValue());
+            }
+            
+            assertNull(consumer.receiveImmediate());
+            
+            consumer.close();
+   
+            session.commit();
+         }
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         store.getCursorProvier().cleanup();
+
+         long timeout = System.currentTimeMillis() + 5000;
+         while (store.isPaging() && timeout > System.currentTimeMillis())
+         {
+            Thread.sleep(100);
+         }
+
+         
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+
+   public void testTwoQueues() throws Exception
+   {
+      boolean persistentMessages = true;
+
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+         
+         locator.setClientFailureCheckPeriod(120000);
+         locator.setConnectionTTL(5000000);
+         locator.setCallTimeout(120000);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+         
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         for (int msg = 1; msg <= 2; msg++)
+         {
+            ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
+   
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               message = consumer.receive(500000);
+               assertNotNull(message);
+               message.acknowledge();
+   
+               //assertEquals(msg, message.getIntProperty("propTest").intValue());
+               
+               System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+            }
+   
+            session.commit();
+            
+            assertNull(consumer.receiveImmediate());
+            
+            consumer.close();
+         }
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         store.getCursorProvier().cleanup();
+
+         long timeout = System.currentTimeMillis() + 5000;
+         while (store.isPaging() && timeout > System.currentTimeMillis())
+         {
+            Thread.sleep(100);
+         }
+
+         store.getCursorProvier().cleanup();
+         
+         Thread.sleep(1000);
+         
+         
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list