[hornetq-commits] JBoss hornetq SVN: r10712 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat May 21 12:29:15 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-21 12:29:14 -0400 (Sat, 21 May 2011)
New Revision: 10712

Modified:
   trunk/hornetq.ipr
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
   trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java
   trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java
   trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
fixing build with changes from Branch_2_2 on tests

Modified: trunk/hornetq.ipr
===================================================================
--- trunk/hornetq.ipr	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/hornetq.ipr	2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,5 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
+  <component name="ASMPluginConfiguration">
+    <asm skipDebug="false" skipFrames="false" skipCode="false" expandFrames="false" />
+    <groovy codeStyle="LEGACY" />
+  </component>
   <component name="AntConfiguration">
     <defaultAnt bundledAnt="true" />
     <buildFile url="file://$PROJECT_DIR$/build-hornetq.xml">
@@ -85,7 +89,7 @@
       <element module="All" copyright="new" />
     </module2copyright>
   </component>
-  <component name="CppTools.Loader" reportImplicitCastToBool="false" warnedAboutFileOutOfSourceRoot="true" version="1" currentProject="$PROJECT_DIR$/native/Makefile" />
+  <component name="CppTools.Loader" reportImplicitCastToBool="false" reportNameReferencedOnce="false" warnedAboutFileOutOfSourceRoot="true" version="3" currentProject="$PROJECT_DIR$/native/Makefile" compilerSelect="AUTO" />
   <component name="DependenciesAnalyzeManager">
     <option name="myForwardDirection" value="false" />
   </component>

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -13,10 +13,13 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -42,9 +45,16 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -262,8 +272,7 @@
          consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
-         
-         
+
          assertEquals(numberOfMessages, queue.getMessageCount());
 
          ClientMessage msg = consumer.receive(5000);
@@ -284,7 +293,7 @@
          }
          assertNull(msg);
 
-         for (int i = xids.size() -1 ; i >= 0; i--)
+         for (int i = xids.size() - 1; i >= 0; i--)
          {
             Xid xid = xids.get(i);
             session.rollback(xid);
@@ -298,25 +307,25 @@
          session = sf.createSession(false, false, false);
 
          session.start();
-         
+
          consumer = session.createConsumer(PagingTest.ADDRESS);
-         
+
          for (int i = 0; i < numberOfMessages; i++)
          {
             msg = consumer.receive(1000);
             assertNotNull(msg);
             msg.acknowledge();
-            
+
             assertEquals(i, msg.getIntProperty("id").intValue());
-            
+
             if (i % 500 == 0)
             {
                session.commit();
             }
          }
-         
+
          session.commit();
-         
+
          session.close();
 
          sf.close();
@@ -324,14 +333,13 @@
          locator.close();
 
          assertEquals(0, queue.getMessageCount());
-         
+
          long timeout = System.currentTimeMillis() + 5000;
          while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
          {
             Thread.sleep(100);
          }
-         assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
-         // assertEquals(numberOfMessages, queue.getMessageCount());
+         assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
       }
       finally
       {
@@ -346,6 +354,350 @@
 
    }
 
+   public void testMissingTXEverythingAcked() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 5000;
+
+      final int numberOfTX = 10;
+
+      final int messagesPerTX = numberOfMessages / numberOfTX;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS.toString(), "q1", true);
+
+         session.createQueue(ADDRESS.toString(), "q2", true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % messagesPerTX == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         session.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+      ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+      List<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+      JournalImpl jrn = new JournalImpl(config.getJournalFileSize(),
+                                        2,
+                                        0,
+                                        0,
+                                        new NIOSequentialFileFactory(getJournalDir()),
+                                        "hornetq-data",
+                                        "hq",
+                                        1);
+      jrn.start();
+      jrn.load(records, list, null);
+
+      // Delete everything from the journal
+      for (RecordInfo info : records)
+      {
+         if (!info.isUpdate)
+         {
+            jrn.appendDeleteRecord(info.id, false);
+         }
+      }
+
+      jrn.stop();
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      Page pg = server.getPagingManager().getPageStore(ADDRESS).getCurrentPage();
+
+      pg.open();
+
+      List<PagedMessage> msgs = pg.read(server.getStorageManager());
+
+      pg.close();
+
+      long queues[] = new long[] { server.locateQueue(new SimpleString("q1")).getID() };
+
+      for (long q : queues)
+      {
+         for (int i = 0; i < msgs.size(); i++)
+         {
+            server.getStorageManager().storeCursorAcknowledge(q, new PagePositionImpl(pg.getPageId(), i));
+         }
+      }
+
+      server.stop();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      ClientSessionFactory csf = locator.createSessionFactory();
+
+      ClientSession sess = csf.createSession();
+
+      sess.start();
+
+      ClientConsumer cons = sess.createConsumer("q1");
+
+      assertNull(cons.receive(500));
+
+      Thread.sleep(5000);
+
+      ClientConsumer cons2 = sess.createConsumer("q2");
+      assertNull(cons2.receive(500));
+
+      long timeout = System.currentTimeMillis() + 5000;
+
+      while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+      {
+         Thread.sleep(100);
+      }
+
+      assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+      sess.close();
+
+      locator.close();
+
+      server.stop();
+   }
+
+   public void testMissingTXEverythingAcked2() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 6;
+
+      final int numberOfTX = 2;
+
+      final int messagesPerTX = numberOfMessages / numberOfTX;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS.toString(), "q1", true);
+
+         session.createQueue(ADDRESS.toString(), "q2", true);
+
+         server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putStringProperty("id", "str-" + i);
+
+            producer.send(message);
+            if ((i + 1) % messagesPerTX == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+
+         session.start();
+
+         for (int i = 1; i <= 2; i++)
+         {
+            ClientConsumer cons = session.createConsumer("q" + i);
+
+            for (int j = 0; j < 3; j++)
+            {
+               ClientMessage msg = cons.receive(5000);
+
+               assertNotNull(msg);
+
+               assertEquals("str-" + j, msg.getStringProperty("id"));
+
+               msg.acknowledge();
+            }
+
+            session.commit();
+
+         }
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory csf = locator.createSessionFactory();
+
+      ClientSession session = csf.createSession();
+
+      session.start();
+
+      for (int i = 1; i <= 2; i++)
+      {
+         ClientConsumer cons = session.createConsumer("q" + i);
+
+         for (int j = 3; j < 6; j++)
+         {
+            ClientMessage msg = cons.receive(5000);
+
+            assertNotNull(msg);
+
+            assertEquals("str-" + j, msg.getStringProperty("id"));
+
+            msg.acknowledge();
+         }
+
+         session.commit();
+         assertNull(cons.receive(500));
+
+      }
+
+      session.close();
+
+      long timeout = System.currentTimeMillis() + 5000;
+
+      while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+      {
+         Thread.sleep(100);
+      }
+
+      locator.close();
+
+      server.stop();
+   }
+
    public void testTwoQueuesOneNoRouting() throws Exception
    {
       boolean persistentMessages = true;
@@ -2454,6 +2806,8 @@
          catch (Throwable ignored)
          {
          }
+         
+         OperationContextImpl.clearContext();
       }
 
    }
@@ -2787,8 +3141,7 @@
          }
       }
    }
-   
-   
+
    public void testPageAndDepageRapidly() throws Exception
    {
       boolean persistentMessages = true;
@@ -2800,11 +3153,7 @@
       config.setJournalSyncNonTransactional(false);
       config.setJournalFileSize(10 * 1024 * 1024);
 
-      HornetQServer server = createServer(true,
-                                          config,
-                                          512 * 1024,
-                                          1024 * 1024,
-                                          new HashMap<String, AddressSettings>());
+      HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
 
       server.start();
 
@@ -2827,9 +3176,9 @@
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-         
+
          final AtomicInteger errors = new AtomicInteger(0);
-         
+
          Thread consumeThread = new Thread()
          {
             public void run()
@@ -2839,16 +3188,16 @@
                {
                   sessionConsumer = sf.createSession(false, false);
                   sessionConsumer.start();
-                  
+
                   ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
-                  
+
                   for (int i = 0; i < numberOfMessages; i++)
                   {
                      ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
                      System.out.println("Message " + i + " consumed");
                      assertNotNull(msg);
                      msg.acknowledge();
-                     
+
                      if (i % 20 == 0)
                      {
                         System.out.println("Commit consumer");
@@ -2874,10 +3223,10 @@
                      errors.incrementAndGet();
                   }
                }
-               
+
             }
          };
-         
+
          consumeThread.start();
 
          ClientMessage message = null;
@@ -2887,7 +3236,7 @@
          for (int i = 0; i < numberOfMessages; i++)
          {
             message = session.createMessage(persistentMessages);
-            
+
             System.out.println("Message " + i + " sent");
 
             HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -2897,23 +3246,24 @@
             message.putIntProperty(new SimpleString("id"), i);
 
             producer.send(message);
-            
+
             Thread.sleep(50);
          }
 
-         
          consumeThread.join();
-         
+
          long timeout = System.currentTimeMillis() + 5000;
-         
-         while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+
+         while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager()
+                                                                                                                             .getPageStore(ADDRESS)
+                                                                                                                             .getNumberOfPages() != 1))
          {
             Thread.sleep(1);
          }
 
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-         
+
          assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
 
          sf.close();
@@ -2933,7 +3283,6 @@
 
    }
 
-
    public void testTwoQueuesDifferentFilters() throws Exception
    {
       boolean persistentMessages = true;
@@ -2959,7 +3308,7 @@
       try
       {
          ServerLocator locator = createInVMNonHALocator();
-         
+
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -2971,14 +3320,16 @@
          ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(false, false, false);
-         
+
          // note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES
          int NQUEUES = 2;
-         
 
-         for (int i = 0 ; i < NQUEUES; i++)
+         for (int i = 0; i < NQUEUES; i++)
          {
-            session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true);
+            session.createQueue(PagingTest.ADDRESS,
+                                PagingTest.ADDRESS.concat("=" + i),
+                                new SimpleString("propTest=" + i),
+                                true);
          }
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -3012,20 +3363,20 @@
          for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
          {
             ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
-   
-            for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+
+            for (int i = 0; i < (numberOfMessages / NQUEUES); i++)
             {
                message = consumer.receive(500000);
                assertNotNull(message);
                message.acknowledge();
-   
+
                assertEquals(nqueue, message.getIntProperty("propTest").intValue());
             }
-            
+
             assertNull(consumer.receiveImmediate());
-            
+
             consumer.close();
-   
+
             session.commit();
          }
 
@@ -3038,7 +3389,6 @@
             Thread.sleep(100);
          }
 
-         
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
@@ -3056,11 +3406,8 @@
          {
          }
       }
-
    }
 
-
-
    public void testTwoQueues() throws Exception
    {
       boolean persistentMessages = true;
@@ -3086,7 +3433,7 @@
       try
       {
          ServerLocator locator = createInVMNonHALocator();
-         
+
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -3098,7 +3445,6 @@
          ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(false, false, false);
-         
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
@@ -3133,22 +3479,22 @@
          for (int msg = 1; msg <= 2; msg++)
          {
             ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
-   
+
             for (int i = 0; i < numberOfMessages; i++)
             {
                message = consumer.receive(500000);
                assertNotNull(message);
                message.acknowledge();
-   
-               //assertEquals(msg, message.getIntProperty("propTest").intValue());
-               
+
+               // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
                System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
             }
-   
+
             session.commit();
-            
+
             assertNull(consumer.receiveImmediate());
-            
+
             consumer.close();
          }
 
@@ -3162,10 +3508,9 @@
          }
 
          store.getCursorProvier().cleanup();
-         
+
          Thread.sleep(1000);
-         
-         
+
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
@@ -3183,11 +3528,374 @@
          {
          }
       }
+   }
 
+   public void testDLAOnLargeMessageAndPaging() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+      AddressSettings dla = new AddressSettings();
+      dla.setMaxDeliveryAttempts(5);
+      dla.setDeadLetterAddress(new SimpleString("DLA"));
+      settings.put(ADDRESS.toString(), dla);
+
+      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.createQueue("DLA", "DLA");
+
+         PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         pgStoreAddress.startPaging();
+         PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < 100; i++)
+         {
+            log.info("send message #" + i);
+            message = session.createMessage(true);
+
+            message.putStringProperty("id", "str" + i);
+
+            message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+            producer.send(message);
+            
+            if ((i + 1) % 2 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+
+         ClientMessage msg = null;
+         
+         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+         {
+            for (int i = 0 ; i < 5; i++)
+            {
+               msg = cons.receive(5000);
+      
+               assertNotNull(msg);
+      
+               msg.acknowledge();
+      
+               assertEquals("str" + msgNr, msg.getStringProperty("id"));
+   
+               for (int j = 0; j < messageSize; j++)
+               {
+                  assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
+               }
+   
+               session.rollback();
+            }
+            
+            pgStoreDLA.startPaging();
+         }
+
+         for (int i = 2; i < 100; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+
+         }
+         
+         assertNull(cons.receiveImmediate());
+
+         cons.close();
+         
+         sf.close();
+         
+         locator.close();
+         
+         server.stop();
+         
+         server.start();
+         
+         locator = createInVMNonHALocator();
+         
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession(false, false);
+         
+         session.start();
+         
+         cons = session.createConsumer(ADDRESS);
+
+         for (int i = 2; i < 100; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+         }
+         
+         cons.close();
+         
+         cons = session.createConsumer("DLA");
+
+         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+         {
+            msg = cons.receive(5000);
+
+            assertNotNull(msg);
+            
+            assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+            for (int i = 0; i < messageSize; i++)
+            {
+               assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+            }
+   
+            msg.acknowledge();
+         }
+         
+         cons.close();
+         
+         cons = session.createConsumer(ADDRESS);
+         
+         session.commit();
+         
+         assertNull(cons.receiveImmediate());
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         
+         pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         
+         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+         
+         pgStoreAddress.getCursorProvier().cleanup();
+         
+         while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+         {
+            Thread.sleep(50);
+         }
+         
+         assertFalse(pgStoreAddress.isPaging());
+
+         session.commit();
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
    }
 
+   public void testExpireLargeMessageOnPaging() throws Exception
+   {
+      clearData();
 
+      Configuration config = createDefaultConfig();
+      config.setMessageExpiryScanPeriod(500);
 
+      config.setJournalSyncNonTransactional(false);
+
+      Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+      AddressSettings dla = new AddressSettings();
+      dla.setMaxDeliveryAttempts(5);
+      dla.setDeadLetterAddress(new SimpleString("DLA"));
+      dla.setExpiryAddress(new SimpleString("DLA"));
+      settings.put(ADDRESS.toString(), dla);
+
+      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+      server.start();
+
+      final int messageSize = 20;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.createQueue("DLA", "DLA");
+
+         PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         pgStoreAddress.startPaging();
+         PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < 500; i++)
+         {
+            if (i % 100 == 0) log.info("send message #" + i);
+            message = session.createMessage(true);
+
+            message.putStringProperty("id", "str" + i);
+            
+            message.setExpiration(System.currentTimeMillis() + 2000);
+
+            if (i % 2 == 0)
+            {
+               message.setBodyInputStream(createFakeLargeStream(messageSize));
+            }
+            else
+            {
+               byte bytes[] = new byte[messageSize];
+               for (int s = 0 ; s < bytes.length; s++)
+               {
+                  bytes[s] = getSamplebyte(s);
+               }
+               message.getBodyBuffer().writeBytes(bytes);
+            }
+
+            producer.send(message);
+            
+            if ((i + 1) % 2 == 0)
+            {
+               session.commit();
+               if (i < 400)
+               {
+                  pgStoreAddress.forceAnotherPage();
+               }
+            }
+         }
+
+         session.commit();
+         
+         sf.close();
+         
+         locator.close();
+         
+         server.stop();
+         
+         Thread.sleep(3000);
+         
+         server.start();
+         
+         locator = createInVMNonHALocator();
+         
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession(false, false);
+         
+         session.start();
+         
+         ClientConsumer consAddr = session.createConsumer(ADDRESS);
+         
+         assertNull(consAddr.receive(1000));
+         
+         
+         ClientConsumer cons = session.createConsumer("DLA");
+
+         for (int i = 0; i < 500; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+         }
+         
+         assertNull(cons.receiveImmediate());
+         
+         session.commit();
+         
+         cons.close();
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         
+         pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         
+         while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+         {
+            Thread.sleep(50);
+         }
+         
+         assertFalse(pgStoreAddress.isPaging());
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -16,6 +16,7 @@
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -29,6 +30,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
@@ -363,7 +365,126 @@
       
       
    }
+   
+   public void testTemoraryQueuesWithFilter() throws Exception
+   {
+      
+      int countTmpQueue=0;
+      
+      final AtomicInteger errors = new AtomicInteger(0);
+      
+      class MyHandler implements MessageHandler
+      {
+         final String color;
+         
+         final CountDownLatch latch;
+         
+         final ClientSession sess;
+         
+         public MyHandler(ClientSession sess, String color, int expectedMessages)
+         {
+            this.sess = sess;
+            latch = new CountDownLatch(expectedMessages);
+            this.color = color;
+         }
+         
+         public boolean waitCompletion() throws Exception
+         {
+            return latch.await(10, TimeUnit.SECONDS);
+         }
+         
+         public void onMessage(ClientMessage message)
+         {
+            try
+            {
+               message.acknowledge();
+               sess.commit();
+               latch.countDown();
+               
+               if (!message.getStringProperty("color").equals(color))
+               {
+                  log.warn("Unexpected color " + message.getStringProperty("color") + " when we were expecting " + color);
+                  errors.incrementAndGet();
+               }
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            }
+         }
+         
+      }
+      
+      String address = "AD_test";
+      int iterations = 100;
+      int msgs = 100;
+      
+      // Will be using a single Session as this is how an issue was raised
+      for (int i = 0 ; i < iterations; i++)
+      {
+         ClientSessionFactory clientsConnecton = locator.createSessionFactory();
+         ClientSession localSession = clientsConnecton.createSession();
+         
+         ClientProducer prod = localSession.createProducer(address);
+         
+         localSession.start();
+         
 
+         log.info("Iteration " + i);
+         String queueRed = address + "_red_" + (countTmpQueue++);
+         String queueBlue = address + "_blue_" + (countTmpQueue++);
+         
+         //ClientSession sessConsumerRed = clientsConnecton.createSession();
+         ClientSession sessConsumerRed = localSession;
+         sessConsumerRed.createTemporaryQueue(address, queueRed, "color='red'");
+         MyHandler redHandler = new MyHandler(sessConsumerRed, "red", msgs);
+         ClientConsumer redClientConsumer = sessConsumerRed.createConsumer(queueRed);
+         redClientConsumer.setMessageHandler(redHandler);
+         //sessConsumerRed.start();
+         
+         //ClientSession sessConsumerBlue = clientsConnecton.createSession();
+         ClientSession sessConsumerBlue = localSession;
+         sessConsumerBlue.createTemporaryQueue(address, queueBlue, "color='blue'");
+         MyHandler blueHandler = new MyHandler(sessConsumerBlue, "blue", msgs);
+         ClientConsumer blueClientConsumer = sessConsumerBlue.createConsumer(queueBlue);
+         blueClientConsumer.setMessageHandler(blueHandler);
+         //sessConsumerBlue.start();
+         
+         try
+         {
+            ClientMessage msgBlue = session.createMessage(false);
+            msgBlue.putStringProperty("color", "blue");
+
+            ClientMessage msgRed = session.createMessage(false);
+            msgRed.putStringProperty("color", "red");
+
+            for (int nmsg = 0; nmsg < msgs; nmsg++)
+            {
+               prod.send(msgBlue);
+               
+               prod.send(msgRed);
+               
+               session.commit();
+            }
+            
+            blueHandler.waitCompletion();
+            redHandler.waitCompletion();
+            
+            assertEquals(0, errors.get());
+            
+         }
+         finally
+         {
+//            sessConsumerRed.close();
+//            sessConsumerBlue.close();
+            localSession.close();
+            clientsConnecton.close();
+         }
+      }
+       
+   }
+
    public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
    {
       session.close();

Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -674,10 +674,13 @@
       {
       }
 
-      @Override
-      public void copyTo(SequentialFile newFileName) throws Exception
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#copyTo(org.hornetq.core.journal.SequentialFile)
+       */
+      public void copyTo(SequentialFile newFileName)
       {
-         throw new UnsupportedOperationException();
+         // TODO Auto-generated method stub
+         
       }
 
    }

Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/RandomUtil.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,14 +1,14 @@
 /*
  * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
  */
 
 package org.hornetq.tests.util;

Modified: trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/main/java/org/hornetq/tests/util/UnitTestCase.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -1,14 +1,14 @@
 /*
  * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
  */
 
 package org.hornetq.tests.util;
@@ -541,8 +541,7 @@
          }
          catch (Exception e)
          {
-            System.out.println("port " + port + " is already bound");
-            System.exit(0);
+            throw new IllegalStateException("port " + port + " is already bound");
          }
          finally
          {
@@ -791,11 +790,11 @@
 
    /** It validates a Bean (POJO) using simple setters and getters with random values.
     *  You can pass a list of properties to be ignored, as some properties will have a pre-defined domain (not being possible to use random-values on them) */
-   protected void validateGettersAndSetters(final Object pojo, final String... IgnoredProperties) throws Exception
+   protected void validateGettersAndSetters(final Object pojo, final String... ignoredProperties) throws Exception
    {
       HashSet<String> ignoreSet = new HashSet<String>();
 
-      for (String ignore : IgnoredProperties)
+      for (String ignore : ignoredProperties)
       {
          ignoreSet.add(ignore);
       }
@@ -860,6 +859,8 @@
    protected void setUp() throws Exception
    {
       super.setUp();
+      
+      OperationContextImpl.clearContext();
 
       deleteDirectory(new File(getTestDir()));
 

Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-05-20 15:47:57 UTC (rev 10711)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-05-21 16:29:14 UTC (rev 10712)
@@ -143,12 +143,11 @@
                                                   100,
                                                   createMockManager(),
                                                   createStorageManagerMock(),
-                                                  createPostOfficeMock(),
                                                   factory,
                                                   null,
                                                   PagingStoreImplTest.destinationTestName,
                                                   addressSettings,
-                                                  getExecutorFactory(),
+                                                  getExecutorFactory().getExecutor(),
                                                   true);
 
       storeImpl.start();
@@ -181,12 +180,11 @@
                                                            100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
-                                                           createPostOfficeMock(),
                                                            factory,
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           getExecutorFactory(),
+                                                           getExecutorFactory().getExecutor(),
                                                            true);
 
       storeImpl.start();
@@ -219,12 +217,11 @@
                                       100,
                                       createMockManager(),
                                       createStorageManagerMock(),
-                                      createPostOfficeMock(),
                                       factory,
                                       null,
                                       PagingStoreImplTest.destinationTestName,
                                       addressSettings,
-                                      getExecutorFactory(),
+                                      getExecutorFactory().getExecutor(),
                                       true);
 
       storeImpl.start();
@@ -248,12 +245,11 @@
                                                            100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
-                                                           createPostOfficeMock(),
                                                            factory,
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           getExecutorFactory(),
+                                                           getExecutorFactory().getExecutor(),
                                                            true);
 
       storeImpl.start();
@@ -325,12 +321,11 @@
                                                            100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
-                                                           createPostOfficeMock(),
                                                            factory,
                                                            storeFactory,
                                                            PagingStoreImplTest.destinationTestName,
                                                            addressSettings,
-                                                           getExecutorFactory(),
+                                                           getExecutorFactory().getExecutor(),
                                                            true);
 
       storeImpl.start();
@@ -474,12 +469,11 @@
                                                                  100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
-                                                                 createPostOfficeMock(),
                                                                  factory,
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 getExecutorFactory(),
+                                                                 getExecutorFactory().getExecutor(),
                                                                  true);
 
       storeImpl.start();
@@ -639,12 +633,11 @@
                                                             100,
                                                             createMockManager(),
                                                             createStorageManagerMock(),
-                                                            createPostOfficeMock(),
                                                             factory,
                                                             storeFactory,
                                                             new SimpleString("test"),
                                                             settings,
-                                                            getExecutorFactory(),
+                                                            getExecutorFactory().getExecutor(),
                                                             true);
       storeImpl2.start();
 
@@ -726,12 +719,11 @@
                                                                  100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
-                                                                 createPostOfficeMock(),
                                                                  factory,
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 getExecutorFactory(),
+                                                                 getExecutorFactory().getExecutor(),
                                                                  true);
 
       storeImpl.start();
@@ -770,12 +762,11 @@
                                                                  100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
-                                                                 createPostOfficeMock(),
                                                                  factory,
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 getExecutorFactory(),
+                                                                 getExecutorFactory().getExecutor(),
                                                                  false);
 
       storeImpl.start();



More information about the hornetq-commits mailing list