[hornetq-commits] JBoss hornetq SVN: r8433 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 27 16:20:56 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-27 16:20:56 -0500 (Fri, 27 Nov 2009)
New Revision: 8433

Modified:
   trunk/src/main/org/hornetq/core/persistence/OperationContext.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-226 - Large Message and Diverts

Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -13,8 +13,6 @@
 
 package org.hornetq.core.persistence;
 
-import java.util.concurrent.Executor;
-
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 
@@ -30,10 +28,6 @@
 public interface OperationContext extends IOCompletion
 {
    
-   /** The executor used on responses.
-    *  If this is not set, it will use the current thread. */
-   void setExecutor(Executor executor);
-
    /** Execute the task when all IO operations are complete,
     *  Or execute it immediately if nothing is pending.  */
    void executeOnCompletion(IOAsyncTask runnable);

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -81,7 +81,7 @@
 
    private String errorMessage = null;
 
-   private Executor executor;
+   private final Executor executor;
 
    private final AtomicInteger executorsPending = new AtomicInteger(0);
 
@@ -102,12 +102,6 @@
       replicationLineUp++;
    }
 
-   /** this method needs to be called before the executor became operational */
-   public void setExecutor(Executor executor)
-   {
-      this.executor = executor;
-   }
-
    public synchronized void replicationDone()
    {
       replicated++;
@@ -137,25 +131,18 @@
          // On this case, we can just execute the context directly
          if (replicationLineUp == replicated && storeLineUp == stored)
          {
-            if (executor != null)
+            // We want to avoid the executor if everything is complete...
+            // However, we can't execute the context if there are executions pending
+            // We need to use the executor on this case
+            if (executorsPending.get() == 0)
             {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are executions pending
-               // We need to use the executor on this case
-               if (executorsPending.get() == 0)
-               {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute the task directly on the same thread
-                  executeNow = true;
-               }
-               else
-               {
-                  execute(completion);
-               }
+               // No need to use an executor here or a context switch
+               // there are no actions pending.. hence we can just execute the task directly on the same thread
+               executeNow = true;
             }
             else
             {
-               executeNow = true;
+               execute(completion);
             }
          }
          else

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -550,11 +550,6 @@
          throw new IllegalStateException("Message cannot be routed more than once");
       }
 
-      if (message.getMessageID() == 0l)
-      {
-         generateID(message);
-      }
- 
       RoutingContext context = new RoutingContextImpl(tx);
 
       SimpleString address = message.getDestination();

Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -17,6 +17,7 @@
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Divert;
 import org.hornetq.core.server.RoutingContext;
@@ -50,6 +51,8 @@
    private final Filter filter;
 
    private final Transformer transformer;
+   
+   private final StorageManager storageManager;
 
    public DivertImpl(final SimpleString forwardAddress,
                      final SimpleString uniqueName,
@@ -57,7 +60,8 @@
                      final boolean exclusive,
                      final Filter filter,
                      final Transformer transformer,
-                     final PostOffice postOffice)
+                     final PostOffice postOffice,
+                     final StorageManager storageManager)
    {
       this.forwardAddress = forwardAddress;
 
@@ -72,6 +76,8 @@
       this.transformer = transformer;
 
       this.postOffice = postOffice;
+      
+      this.storageManager = storageManager;
    }
 
    public void route(final ServerMessage message, final RoutingContext context) throws Exception
@@ -81,17 +87,16 @@
       // We must make a copy of the message, otherwise things like returning credits to the page won't work
       // properly on ack, since the original destination will be overwritten
 
-      // TODO we can optimise this so it doesn't copy if it's not routed anywhere else   
+      // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
       
-      ServerMessage copy = message.copy();
+      long id = storageManager.generateUniqueID();
+      ServerMessage copy = message.copy(id);
       
-      // Setting the messageID to 0. The postOffice should set a new one
-      copy.setMessageID(0);
-
+      // This will set the original MessageId, and the original destination
+      copy.setOriginalHeaders(message, false);
+      
       copy.setDestination(forwardAddress);
 
-      copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
-
       if (transformer != null)
       {
          copy = transformer.transform(copy);

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -1414,7 +1414,8 @@
                                         config.isExclusive(),
                                         filter,
                                         transformer,
-                                        postOffice);
+                                        postOffice,
+                                        storageManager);
          // pagingManager,
          // storageManager);
 

Modified: trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -45,8 +45,20 @@
 {
    private static final Logger log = Logger.getLogger(DivertTest.class);
    
+   final int minLargeMessageSize = ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2;
+   
    public void testPersistentDivert() throws Exception
    {
+      doTestPersistentDivert(false);
+   }
+   
+   public void testPersistentDiverLargeMessage() throws Exception
+   {
+      doTestPersistentDivert(true);
+   }
+   
+   public void doTestPersistentDivert(boolean largeMessage) throws Exception
+   {
       Configuration conf = createDefaultConfig();
       
       conf.setClustered(true);
@@ -121,6 +133,11 @@
       {
          ClientMessage message = session.createClientMessage(true);
          
+         if (largeMessage)
+         {
+            message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+         }
+         
          message.putIntProperty(propKey, i);
          
          producer.send(message);
@@ -128,12 +145,17 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer1.receive(200);
+         ClientMessage message = consumer1.receive(5000);
          
          assertNotNull(message);
          
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+         
          message.acknowledge();
       }
       
@@ -141,12 +163,17 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer2.receive(200);
+         ClientMessage message = consumer2.receive(5000);
          
          assertNotNull(message);
          
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          message.acknowledge();
       }
       
@@ -154,12 +181,17 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer3.receive(200);
+         ClientMessage message = consumer3.receive(5000);
          
          assertNotNull(message);
          
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          message.acknowledge();
       }
       
@@ -167,12 +199,17 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer4.receive(200);
+         ClientMessage message = consumer4.receive(5000);
          
          assertNotNull(message);
          
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          message.acknowledge();
       }
       
@@ -183,9 +220,30 @@
       
       messagingService.stop();
    }
+
+   /**
+    * @param message
+    */
+   private void checkLargeMessage(ClientMessage message)
+   {
+      for (int j = 0 ; j < minLargeMessageSize; j++)
+      {
+         assertEquals(getSamplebyte(j), message.getBodyBuffer().readByte());
+      }
+   }
    
    public void testPersistentDivertRestartBeforeConsume() throws Exception
    {
+      doTestPersistentDivertRestartBeforeConsume(false);
+   }
+   
+   public void testPersistentDivertRestartBeforeConsumeLargeMessage() throws Exception
+   {
+      doTestPersistentDivertRestartBeforeConsume(true);
+   }
+   
+   public void doTestPersistentDivertRestartBeforeConsume(boolean largeMessage) throws Exception
+   {
       Configuration conf = createDefaultConfig();
       
       conf.setClustered(true);
@@ -251,6 +309,12 @@
          ClientMessage message = session.createClientMessage(true);
          
          message.putIntProperty(propKey, i);
+
+         if (largeMessage)
+         {
+            message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+         }
+
          
          producer.send(message);
       }
@@ -281,10 +345,15 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer1.receive(200);
+         ClientMessage message = consumer1.receive(5000);
          
          assertNotNull(message);
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
          message.acknowledge();
@@ -294,10 +363,15 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer2.receive(200);
+         ClientMessage message = consumer2.receive(5000);
          
          assertNotNull(message);
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
          message.acknowledge();
@@ -307,10 +381,15 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer3.receive(200);
+         ClientMessage message = consumer3.receive(5000);
          
          assertNotNull(message);
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
          message.acknowledge();
@@ -320,10 +399,15 @@
       
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = consumer4.receive(200);
+         ClientMessage message = consumer4.receive(5000);
          
          assertNotNull(message);
          
+         if (largeMessage)
+         {
+            checkLargeMessage(message);
+         }
+
          assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
          
          message.acknowledge();

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java	2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java	2009-11-27 21:20:56 UTC (rev 8433)
@@ -112,7 +112,6 @@
 
          for (int i = 0; i < 200; i++)
          {
-            System.out.println("Sent " + i);
             producer.send(message);
          }
 



More information about the hornetq-commits mailing list