[hornetq-commits] JBoss hornetq SVN: r10169 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 1 22:36:20 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-01 22:36:20 -0500 (Tue, 01 Feb 2011)
New Revision: 10169

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Adding ignore on topic subscription invalid selector on paging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-02 03:36:20 UTC (rev 10169)
@@ -272,7 +272,7 @@
                if (complete)
                {
 
-                  log.debug("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
+                  log.info("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
                   pagingStore.forceAnotherPage();
 
                   Page currentPage = pagingStore.getCurrentPage();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-02-02 03:36:20 UTC (rev 10169)
@@ -131,6 +131,13 @@
    // ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
+   
+   // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
+   // as a way to keep its existence valid and TCK tests
+   // That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
+   // For that reason, this filter needs to be rejected on paging or any other component on the system, and just be ignored for any purpose
+   // It's declared here as this filter is considered a global ignore
+   public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
 
    // Static
    // ---------------------------------------------------------------------------------------
@@ -1629,7 +1636,17 @@
       
       long queueID = storageManager.generateUniqueID();
 
-      PageSubscription pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+      PageSubscription pageSubscription;
+      
+      
+      if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER))
+      {
+         pageSubscription = null;
+      }
+      else
+      {
+         pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+      }
 
       final Queue queue = queueFactory.createQueue(queueID,
                                                    address,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2011-02-02 03:36:20 UTC (rev 10169)
@@ -40,6 +40,7 @@
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.TransactionDetail;
@@ -89,7 +90,7 @@
 {
    private static final Logger log = Logger.getLogger(JMSServerManagerImpl.class);
 
-   private static final String REJECT_FILTER = "__HQX=-1";
+   private static final String REJECT_FILTER = HornetQServerImpl.GENERIC_IGNORED_FILTER;
 
    private BindingRegistry registry;
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-02 03:36:20 UTC (rev 10169)
@@ -49,6 +49,7 @@
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -110,16 +111,16 @@
 
       super.tearDown();
    }
-   
+
    public void testPreparePersistent() throws Exception
    {
       boolean persistentMessages = true;
-      
+
       System.out.println("PageDir:" + getPageDir());
       clearData();
 
       Configuration config = createDefaultConfig();
-      
+
       config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
@@ -129,7 +130,7 @@
                                           new HashMap<String, AddressSettings>());
 
       server.start();
-      
+
       final int messageSize = 1024;
 
       final int numberOfMessages = 10000;
@@ -180,7 +181,7 @@
          session.commit();
          session.close();
          session = null;
-         
+
          sf.close();
          locator.close();
 
@@ -195,16 +196,15 @@
 
          locator = createInVMNonHALocator();
          sf = locator.createSessionFactory();
-         
+
          Queue queue = server.locateQueue(ADDRESS);
-         
+
          assertEquals(numberOfMessages, queue.getMessageCount());
 
-         
          LinkedList<Xid> xids = new LinkedList<Xid>();
-         
+
          int msgReceived = 0;
-         for (int i = 0 ; i < numberOfMessages / 999; i++)
+         for (int i = 0; i < numberOfMessages / 999; i++)
          {
             ClientSession sessionConsumer = sf.createSession(true, false, false);
             Xid xid = newXID();
@@ -212,7 +212,7 @@
             sessionConsumer.start(xid, XAResource.TMNOFLAGS);
             sessionConsumer.start();
             ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
-            for (int msgCount = 0 ; msgCount < 1000; i++)
+            for (int msgCount = 0; msgCount < 1000; i++)
             {
                if (msgReceived == numberOfMessages)
                {
@@ -227,18 +227,17 @@
             sessionConsumer.prepare(xid);
             sessionConsumer.close();
          }
-         
-         
+
          ClientSession sessionCheck = sf.createSession(true, true);
-         
+
          ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
-         
+
          assertNull(consumer.receiveImmediate());
 
          sessionCheck.close();
-         
+
          System.out.println(queue.getMessagesAdded());
-         
+
          assertEquals(numberOfMessages, queue.getMessageCount());
 
          sf.close();
@@ -263,26 +262,26 @@
          consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
-         
+
          assertNull(consumer.receiveImmediate());
-         
+
          for (Xid xid : xids)
          {
             session.rollback(xid);
          }
-         
+
          xids.clear();
-         
+
          assertNotNull(consumer.receiveImmediate());
 
          session.close();
-         
+
          sf.close();
-         
+
          locator.close();
-         
+
          queue.getMessageCount();
-         //assertEquals(numberOfMessages, queue.getMessageCount());
+         // assertEquals(numberOfMessages, queue.getMessageCount());
       }
       finally
       {
@@ -297,7 +296,121 @@
 
    }
 
+   public void testTwoQueuesOneNoRouting() throws Exception
+   {
+      boolean persistentMessages = true;
 
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            assertEquals(i, message.getIntProperty("id").intValue());
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
+         session.commit();
+
+         session.commit();
+         
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         store.getCursorProvier().cleanup();
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         while (store.isPaging() && timeout > System.currentTimeMillis())
+         {
+            Thread.sleep(100);
+         }
+
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+           // System.exit(-1);
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testSendReceivePagingPersistent() throws Exception
    {
       internaltestSendReceivePaging(true);
@@ -312,18 +425,18 @@
    {
       internalMultiQueuesTest(true);
    }
-   
+
    public void testWithMultiQueues() throws Exception
    {
       internalMultiQueuesTest(false);
    }
-   
+
    public void internalMultiQueuesTest(final boolean divert) throws Exception
    {
       clearData();
 
       Configuration config = createDefaultConfig();
-      
+
       config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
@@ -333,7 +446,7 @@
                                           new HashMap<String, AddressSettings>());
 
       if (divert)
-      {   
+      {
          DivertConfiguration divert1 = new DivertConfiguration("dv1",
                                                                "nm1",
                                                                PagingTest.ADDRESS.toString(),
@@ -341,7 +454,7 @@
                                                                true,
                                                                null,
                                                                null);
-   
+
          DivertConfiguration divert2 = new DivertConfiguration("dv2",
                                                                "nm2",
                                                                PagingTest.ADDRESS.toString(),
@@ -349,11 +462,11 @@
                                                                true,
                                                                null,
                                                                null);
-   
+
          ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
          divertList.add(divert1);
          divertList.add(divert2);
-   
+
          config.setDivertConfigurations(divertList);
       }
 
@@ -424,7 +537,7 @@
             session.close();
 
             server.stop();
-            
+
             sf.close();
             locator.close();
          }
@@ -472,7 +585,8 @@
 
                         Assert.assertNotNull(message2);
 
-                        if (i % 1000 == 0) session.commit();
+                        if (i % 1000 == 0)
+                           session.commit();
 
                         try
                         {
@@ -487,7 +601,7 @@
                            throw e;
                         }
                      }
-                     
+
                      session.commit();
 
                      consumer.close();
@@ -514,13 +628,12 @@
             threads[i].join();
          }
 
-         
          sf2.close();
          locator.close();
 
          assertEquals(0, errors.get());
-         
-         for (int i = 0 ; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+
+         for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
          {
             if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
             {
@@ -547,12 +660,12 @@
 
    private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
    {
-      
+
       System.out.println("PageDir:" + getPageDir());
       clearData();
 
       Configuration config = createDefaultConfig();
-      
+
       config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
@@ -644,7 +757,8 @@
 
             Assert.assertNotNull(message2);
 
-            if (i % 1000 == 0) session.commit();
+            if (i % 1000 == 0)
+               session.commit();
 
             try
             {
@@ -663,9 +777,9 @@
          consumer.close();
 
          session.close();
-         
+
          sf.close();
-         
+
          locator.close();
       }
       finally
@@ -689,7 +803,7 @@
 
       UnitTestCase.assertEqualsByteArrays(body, other);
    }
-   
+
    /**
     * - Make a destination in page mode
     * - Add stuff to a transaction
@@ -813,9 +927,9 @@
          consumer.close();
 
          session.close();
-         
+
          sf.close();
-         
+
          locator.close();
       }
       finally
@@ -925,12 +1039,14 @@
                   assertFalse(msg.getBooleanProperty("new"));
                   Assert.assertNotNull(msg);
                }
-               
+
                ClientMessage msgReceived = consumer.receiveImmediate();
-               
+
                if (msgReceived != null)
                {
-                  System.out.println("new = " + msgReceived.getBooleanProperty("new") + " id = " + msgReceived.getIntProperty("id"));
+                  System.out.println("new = " + msgReceived.getBooleanProperty("new") +
+                                     " id = " +
+                                     msgReceived.getIntProperty("id"));
                }
 
                Assert.assertNull(msgReceived);
@@ -972,9 +1088,9 @@
          consumer.close();
 
          session.close();
-         
+
          sf.close();
-         
+
          locator.close();
       }
       finally
@@ -1013,7 +1129,6 @@
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         
          ClientSessionFactory sf = locator.createSessionFactory();
 
          byte[] body = new byte[messageSize];
@@ -1034,7 +1149,7 @@
             ClientMessage message = sessionNonTX.createMessage(true);
             message.getBodyBuffer().writeBytes(body);
             message.putIntProperty(new SimpleString("id"), i);
-            message.putStringProperty(new SimpleString("tst"),  new SimpleString("i=" + i));
+            message.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i));
 
             producerTransacted.send(message);
 
@@ -1044,7 +1159,7 @@
                for (int j = 0; j < 20; j++)
                {
                   ClientMessage msgSend = sessionNonTX.createMessage(true);
-                  msgSend.putStringProperty(new SimpleString("tst"),  new SimpleString("i=" + i + ", j=" + j));
+                  msgSend.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i + ", j=" + j));
                   msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
                   producerNonTransacted.send(msgSend);
                }
@@ -1106,9 +1221,9 @@
          consumer.close();
 
          sessionNonTX.close();
-         
+
          sf.close();
-         
+
          locator.close();
       }
       finally
@@ -1154,7 +1269,7 @@
 
       try
       {
-         
+
          final ClientSessionFactory sf = locator.createSessionFactory();
 
          final byte[] body = new byte[messageSize];
@@ -1220,14 +1335,14 @@
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
          for (int i = 0; i < numberOfMessages; i++)
-         {  
+         {
             ClientMessage msg = consumer.receive(5000);
             assertNotNull(msg);
             assertEquals(i, msg.getIntProperty("count").intValue());
             msg.acknowledge();
             if (i > 0 && i % 10 == 0)
             {
-              session.commit();
+               session.commit();
             }
          }
          session.commit();
@@ -1235,9 +1350,9 @@
          session.close();
 
          producerThread.join();
-         
+
          locator.close();
-         
+
          sf.close();
 
          assertEquals(0, errors.get());
@@ -1396,7 +1511,7 @@
       clearData();
 
       Configuration config = createDefaultConfig();
-      
+
       config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
@@ -1650,28 +1765,27 @@
          }
 
          session.commit();
-         
+
          session.close();
-         
+
          locator.close();
-         
+
          locator = createInVMNonHALocator();
-         
+
          server.stop();
-         
-         
+
          server = createServer(true,
                                config,
                                PagingTest.PAGE_SIZE,
                                PagingTest.PAGE_MAX,
                                new HashMap<String, AddressSettings>());
-         
+
          server.start();
 
          sf = locator.createSessionFactory();
-         
-         session =  sf.createSession(null, null, false, false, false, false, 0);
 
+         session = sf.createSession(null, null, false, false, false, false, 0);
+
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
@@ -1745,27 +1859,27 @@
          }
 
          session.commit();
-         
+
          session.close();
-         
+
          locator.close();
-         
+
          server.stop();
-         
+
          server = createServer(true,
                                config,
                                PagingTest.PAGE_SIZE,
                                PagingTest.PAGE_MAX,
                                new HashMap<String, AddressSettings>());
-         
+
          server.start();
-         
+
          locator = createInVMNonHALocator();
 
          sf = locator.createSessionFactory();
-         
-         session =  sf.createSession(null, null, false, false, false, false, 0);
 
+         session = sf.createSession(null, null, false, false, false, false, 0);
+
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
@@ -1781,25 +1895,25 @@
          }
 
          session.close();
-         
+
          locator.close();
-         
+
          server.stop();
-         
+
          server = createServer(true,
                                config,
                                PagingTest.PAGE_SIZE,
                                PagingTest.PAGE_MAX,
                                new HashMap<String, AddressSettings>());
-         
+
          server.start();
-         
+
          locator = createInVMNonHALocator();
 
          sf = locator.createSessionFactory();
-         
-         session =  sf.createSession(null, null, false, false, false, false, 0);
 
+         session = sf.createSession(null, null, false, false, false, false, 0);
+
          consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
@@ -1979,7 +2093,7 @@
       }
 
    }
-   
+
    public void testDropMessagesExpiring() throws Exception
    {
       clearData();
@@ -2189,7 +2303,7 @@
       }
 
    }
-   
+
    public void testSyncPage() throws Exception
    {
       Configuration config = createDefaultConfig();
@@ -2205,74 +2319,73 @@
       try
       {
          server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-         
+
          final CountDownLatch pageUp = new CountDownLatch(0);
          final CountDownLatch pageDone = new CountDownLatch(1);
-         
+
          OperationContext ctx = new OperationContext()
          {
-            
+
             public void onError(int errorCode, String errorMessage)
             {
             }
-            
+
             public void done()
             {
             }
-            
+
             public void storeLineUp()
             {
             }
-            
+
             public boolean waitCompletion(long timeout) throws Exception
             {
                return false;
             }
-            
+
             public void waitCompletion() throws Exception
             {
-               
+
             }
-            
+
             public void replicationLineUp()
             {
-               
+
             }
-            
+
             public void replicationDone()
             {
-               
+
             }
-            
+
             public void pageSyncLineUp()
             {
                pageUp.countDown();
             }
-            
+
             public void pageSyncDone()
             {
                pageDone.countDown();
             }
-            
+
             public void executeOnCompletion(IOAsyncTask runnable)
             {
-               
+
             }
          };
 
-         
          OperationContextImpl.setContext(ctx);
-         
+
          PagingManager paging = server.getPagingManager();
-         
+
          PagingStore store = paging.getPageStore(ADDRESS);
-         
+
          store.sync();
-         
+
          assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-         
+
          assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-         
+
          server.stop();
 
       }
@@ -2289,7 +2402,6 @@
 
    }
 
-
    public void testSyncPageTX() throws Exception
    {
       Configuration config = createDefaultConfig();
@@ -2305,74 +2417,73 @@
       try
       {
          server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-         
+
          final CountDownLatch pageUp = new CountDownLatch(0);
          final CountDownLatch pageDone = new CountDownLatch(1);
-         
+
          OperationContext ctx = new OperationContext()
          {
-            
+
             public void onError(int errorCode, String errorMessage)
             {
             }
-            
+
             public void done()
             {
             }
-            
+
             public void storeLineUp()
             {
             }
-            
+
             public boolean waitCompletion(long timeout) throws Exception
             {
                return false;
             }
-            
+
             public void waitCompletion() throws Exception
             {
-               
+
             }
-            
+
             public void replicationLineUp()
             {
-               
+
             }
-            
+
             public void replicationDone()
             {
-               
+
             }
-            
+
             public void pageSyncLineUp()
             {
                pageUp.countDown();
             }
-            
+
             public void pageSyncDone()
             {
                pageDone.countDown();
             }
-            
+
             public void executeOnCompletion(IOAsyncTask runnable)
             {
-               
+
             }
          };
 
-         
          OperationContextImpl.setContext(ctx);
-         
+
          PagingManager paging = server.getPagingManager();
-         
+
          PagingStore store = paging.getPageStore(ADDRESS);
-         
+
          store.sync();
-         
+
          assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-         
+
          assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-         
+
          server.stop();
 
       }
@@ -2389,7 +2500,6 @@
 
    }
 
-
    public void testPagingOneDestinationOnly() throws Exception
    {
       SimpleString PAGED_ADDRESS = new SimpleString("paged");
@@ -2464,7 +2574,7 @@
          }
 
          consumerNonPaged.close();
-         
+
          session.commit();
 
          ackList = null;



More information about the hornetq-commits mailing list