[hornetq-commits] JBoss hornetq SVN: r11914 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 19 12:57:47 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-19 12:57:47 -0500 (Mon, 19 Dec 2011)
New Revision: 11914

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving things

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-19 17:57:47 UTC (rev 11914)
@@ -1276,7 +1276,7 @@
       {
          if (msg.getRefCount() == 0)
          {
-            JournalStorageManager.log.debug("Large message: " + msg.getMessageID() +
+            JournalStorageManager.log.info("Large message: " + msg.getMessageID() +
                                             " didn't have any associated reference, file will be deleted");
             msg.decrementDelayDeletionCount();
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-19 17:57:47 UTC (rev 11914)
@@ -1187,6 +1187,15 @@
          return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
       }
    }
+   
+   /**
+    * For tests only, don't use this method as it's not part of the API
+    * @param factory
+    */
+   public void replaceQueueFactory(QueueFactory factory)
+   {
+      this.queueFactory = factory;
+   }
 
    // Package protected
    // ----------------------------------------------------------------------------
@@ -1198,26 +1207,6 @@
     * Protected so tests can change this behaviour
     * @param backupConnector
     */
-   // protected FailoverManagerImpl createBackupConnectionFailoverManager(final TransportConfiguration backupConnector,
-   // final ExecutorService threadPool,
-   // final ScheduledExecutorService scheduledPool)
-   // {
-   // return new FailoverManagerImpl((ClientSessionFactory)null,
-   // backupConnector,
-   // null,
-   // false,
-   // HornetQClient.DEFAULT_CALL_TIMEOUT,
-   // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-   // HornetQClient.DEFAULT_CONNECTION_TTL,
-   // 0,
-   // 1.0d,
-   // 0,
-   // 1,
-   // false,
-   // threadPool,
-   // scheduledPool,
-   // null);
-   // }
 
    protected PagingManager createPagingManager()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-12-19 17:57:47 UTC (rev 11914)
@@ -2203,7 +2203,8 @@
       return status;
    }
 
-   private void postAcknowledge(final MessageReference ref)
+   // Protected as testcases may change this behaviour
+   protected void postAcknowledge(final MessageReference ref)
    {
       QueueImpl queue = (QueueImpl)ref.getQueue();
 
@@ -2219,6 +2220,15 @@
 
       boolean durableRef = message.isDurable() && queue.durable;
 
+      try
+      {
+         message.decrementRefCount();
+      }
+      catch (Exception e)
+      {
+         QueueImpl.log.warn("Unable to decrement reference counting", e);
+      }
+
       if (durableRef)
       {
          int count = message.decrementDurableRefCount();
@@ -2250,15 +2260,6 @@
             }
          }
       }
-
-      try
-      {
-         message.decrementRefCount();
-      }
-      catch (Exception e)
-      {
-         QueueImpl.log.warn("Unable to decrement reference counting", e);
-      }
    }
 
    void postRollback(final LinkedList<MessageReference> refs)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	2011-12-19 17:57:47 UTC (rev 11914)
@@ -13,7 +13,10 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.IOException;
 import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -29,14 +32,25 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.utils.ExecutorFactory;
 
 /**
  * A LargeMessageTest
@@ -128,9 +142,9 @@
          }
 
          server.stop(false);
-         
+
          forceGC();
-         
+
          server.start();
 
          server.stop();
@@ -254,8 +268,12 @@
 
       try
       {
-         server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
-         
+         server = createServer(true,
+                               createDefaultConfig(isNetty()),
+                               10000,
+                               20000,
+                               new HashMap<String, AddressSettings>());
+
          // server.getConfiguration()
          // .getInterceptorClassNames()
          // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -270,7 +288,7 @@
          session = sf.createSession(false, true, true);
 
          session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-         
+
          server.getPagingManager().getPageStore(ADDRESS).startPaging();
 
          ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
@@ -282,19 +300,19 @@
             producer.send(clientFile);
          }
          session.commit();
-         
+
          validateNoFilesOnLargeDir(10);
 
          for (int h = 0; h < 5; h++)
          {
             session.close();
-            
+
             sf.close();
-            
+
             server.stop();
-            
+
             server.start();
-            
+
             sf = locator.createSessionFactory();
 
             session = sf.createSession(false, false);
@@ -321,11 +339,11 @@
             {
                session.rollback();
             }
-            
+
             session.close();
             sf.close();
          }
-         
+
          server.stop(false);
          server.start();
 
@@ -363,8 +381,12 @@
 
       try
       {
-         server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
-         
+         server = createServer(true,
+                               createDefaultConfig(isNetty()),
+                               10000,
+                               20000,
+                               new HashMap<String, AddressSettings>());
+
          // server.getConfiguration()
          // .getInterceptorClassNames()
          // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -377,7 +399,7 @@
          ClientSessionFactory sf = locator.createSessionFactory();
 
          session = sf.createSession(true, false, false);
-         
+
          Xid xid1 = newXID();
          Xid xid2 = newXID();
 
@@ -394,13 +416,11 @@
             producer.send(clientFile);
          }
          session.end(xid1, XAResource.TMSUCCESS);
-         
+
          session.prepare(xid1);
 
-
          session.start(xid2, XAResource.TMNOFLAGS);
 
-
          for (int i = 0; i < 10; i++)
          {
             Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -409,32 +429,32 @@
             producer.send(clientFile);
          }
          session.end(xid2, XAResource.TMSUCCESS);
-         
+
          session.prepare(xid2);
-         
+
          session.close();
          sf.close();
-         
+
          server.stop(false);
          server.start();
-         
-         for (int start = 0 ; start < 2; start++)
+
+         for (int start = 0; start < 2; start++)
          {
             System.out.println("Start " + start);
-            
+
             sf = locator.createSessionFactory();
-            
+
             if (start == 0)
             {
                session = sf.createSession(true, false, false);
                session.commit(xid1, false);
                session.close();
             }
-            
+
             session = sf.createSession(false, false, false);
             ClientConsumer cons1 = session.createConsumer(ADDRESS);
             session.start();
-            for (int i = 0 ; i < 10; i++)
+            for (int i = 0; i < 10; i++)
             {
                log.info("I = " + i);
                ClientMessage msg = cons1.receive(5000);
@@ -442,7 +462,7 @@
                assertEquals(1, msg.getIntProperty("txid").intValue());
                msg.acknowledge();
             }
-            
+
             if (start == 1)
             {
                session.commit();
@@ -451,26 +471,26 @@
             {
                session.rollback();
             }
-            
+
             session.close();
             sf.close();
-   
+
             server.stop();
             server.start();
          }
          server.stop();
-         
+
          validateNoFilesOnLargeDir(10);
-         
+
          server.start();
 
          sf = locator.createSessionFactory();
-         
+
          session = sf.createSession(true, false, false);
          session.rollback(xid2);
-         
+
          sf.close();
-         
+
          server.stop();
          server.start();
          server.stop();
@@ -497,6 +517,296 @@
       }
    }
 
+   public void testRestartBeforeDelete() throws Exception
+   {
+
+      class NoPostACKQueue extends QueueImpl
+      {
+
+         public NoPostACKQueue(long id,
+                               SimpleString address,
+                               SimpleString name,
+                               Filter filter,
+                               PageSubscription pageSubscription,
+                               boolean durable,
+                               boolean temporary,
+                               ScheduledExecutorService scheduledExecutor,
+                               PostOffice postOffice,
+                               StorageManager storageManager,
+                               HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                               Executor executor)
+         {
+            super(id,
+                  address,
+                  name,
+                  filter,
+                  pageSubscription,
+                  durable,
+                  temporary,
+                  scheduledExecutor,
+                  postOffice,
+                  storageManager,
+                  addressSettingsRepository,
+                  executor);
+         }
+
+         protected void postAcknowledge(final MessageReference ref)
+         {
+            System.out.println("Ignoring postACK on message " + ref);
+         }
+      }
+
+      class NoPostACKQueueFactory implements QueueFactory
+      {
+
+         final StorageManager storageManager;
+
+         final PostOffice postOffice;
+
+         final ScheduledExecutorService scheduledExecutor;
+
+         final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+         final ExecutorFactory execFactory;
+
+         public NoPostACKQueueFactory(StorageManager storageManager,
+                                      PostOffice postOffice,
+                                      ScheduledExecutorService scheduledExecutor,
+                                      HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                                      final ExecutorFactory execFactory)
+         {
+            this.storageManager = storageManager;
+            this.postOffice = postOffice;
+            this.scheduledExecutor = scheduledExecutor;
+            this.addressSettingsRepository = addressSettingsRepository;
+            this.execFactory = execFactory;
+         }
+
+         public Queue createQueue(long persistenceID,
+                                  SimpleString address,
+                                  SimpleString name,
+                                  Filter filter,
+                                  PageSubscription pageSubscription,
+                                  boolean durable,
+                                  boolean temporary)
+         {
+
+             return new NoPostACKQueue(persistenceID,
+             address,
+             name,
+             filter,
+             pageSubscription,
+             durable,
+             temporary,
+             scheduledExecutor,
+             postOffice,
+             storageManager,
+             addressSettingsRepository,
+             execFactory.getExecutor());
+//            return new QueueImpl(persistenceID,
+//                                 address,
+//                                 name,
+//                                 filter,
+//                                 pageSubscription,
+//                                 durable,
+//                                 temporary,
+//                                 scheduledExecutor,
+//                                 postOffice,
+//                                 storageManager,
+//                                 addressSettingsRepository,
+//                                 execFactory.getExecutor());
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.core.server.QueueFactory#setPostOffice(org.hornetq.core.postoffice.PostOffice)
+          */
+         public void setPostOffice(PostOffice postOffice)
+         {
+         }
+
+      }
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+      try
+      {
+         server = createServer(true, isNetty());
+         server.start();
+
+         QueueFactory original = server.getQueueFactory();
+
+         ((HornetQServerImpl)server).replaceQueueFactory(new NoPostACKQueueFactory(server.getStorageManager(),
+                                                                                   server.getPostOffice(),
+                                                                                   server.getScheduledPool(),
+                                                                                   server.getAddressSettingsRepository(),
+                                                                                   server.getExecutorFactory()));
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+            producer.send(clientFile);
+         }
+         session.commit();
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            assertNotNull(msg);
+            msg.saveToOutputStream(new java.io.OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+               }
+            });
+            msg.acknowledge();
+            session.commit();
+         }
+
+         ((HornetQServerImpl)server).replaceQueueFactory(original);
+         server.stop(false);
+         server.start();
+
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   public void testConsumeAfterRestart() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+      try
+      {
+         server = createServer(true, isNetty());
+         server.start();
+
+         QueueFactory original = server.getQueueFactory();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, true, true);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+            producer.send(clientFile);
+         }
+         session.commit();
+
+         session.close();
+         sf.close();
+
+         server.stop();
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false);
+
+         ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            assertNotNull(msg);
+            msg.saveToOutputStream(new java.io.OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+               }
+            });
+            msg.acknowledge();
+            session.commit();
+         }
+
+         ((HornetQServerImpl)server).replaceQueueFactory(original);
+         server.stop(false);
+         server.start();
+
+         server.stop();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor
    {
 



More information about the hornetq-commits mailing list