[hornetq-commits] JBoss hornetq SVN: r11282 - in branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 1 18:59:45 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-01 18:59:45 -0400 (Thu, 01 Sep 2011)
New Revision: 11282

Added:
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
back porting JBPAPP-7115

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -143,6 +143,14 @@
     */
    protected MessageImpl(final MessageImpl other)
    {
+      this(other, other.getProperties());
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other, TypedProperties properties)
+   {
       messageID = other.getMessageID();
       userID = other.getUserID();
       address = other.getAddress();
@@ -151,7 +159,7 @@
       expiration = other.getExpiration();
       timestamp = other.getTimestamp();
       priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
+      this.properties = new TypedProperties(properties);
 
       // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * A LargeServerMessageImpl
@@ -70,12 +71,13 @@
 
    /**
     * Copy constructor
+    * @param properties
     * @param copy
     * @param fileCopy
     */
-   private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+   private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
-      super(copy);
+      super(copy, properties);
       linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
@@ -281,8 +283,30 @@
          this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
       }
    }
+   
+   @Override
+   public synchronized ServerMessage copy()
+   {
+      incrementDelayDeletionCount();
 
+      long idToUse = messageID;
 
+      if (linkMessage != null)
+      {
+         idToUse = linkMessage.getMessageID();
+      }
+
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                               : (LargeServerMessageImpl)linkMessage,
+                                                            properties,
+                                                            newfile,
+                                                            messageID);
+      return newMessage;
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
@@ -301,6 +325,7 @@
    
          ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
                                                                                   : (LargeServerMessageImpl)linkMessage,
+                                                               properties,
                                                                newfile,
                                                                newID);
          return newMessage;
@@ -317,7 +342,7 @@
             
             file.copyTo(newFile);
             
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
             
             newMessage.linkMessage = null;
             

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -325,7 +325,7 @@
    // Consumer implementation ---------------------------------------
 
    /* Hook for processing message before forwarding */
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       if (useDuplicateDetection)
       {
@@ -337,10 +337,20 @@
 
       if (transformer != null)
       {
-         message = transformer.transform(message);
+         final ServerMessage transformedMessage = transformer.transform(message);
+         if (transformedMessage != message)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+            }
+         }
+         return transformedMessage;
       }
-
-      return message;
+      else
+      {
+         return message;
+      }
    }
 
    /**

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -113,34 +113,50 @@
    }
 
    @Override
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       // We make a copy of the message, then we strip out the unwanted routing id headers and leave
       // only
       // the one pertinent for the address node - this is important since different queues on different
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
-      message = message.copy();
+      ServerMessage messageCopy = message.copy();
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
+      }
 
       // TODO - we can optimise this
 
-      Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+      Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getBytesProperty(idsHeaderName);
+      
+      if (queueIds == null)
+      {
+         // Sanity check only
+         log.warn("no queue IDs defined!,  originalMessage  = " + message +
+                  ", copiedMessage = " +
+                  messageCopy +
+                  ", props=" +
+                  idsHeaderName);
+         throw new IllegalStateException("no queueIDs defined");
+      }
 
       for (SimpleString propName : propNames)
       {
          if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
          {
-            message.removeProperty(propName);
+            messageCopy.removeProperty(propName);
          }
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
 
-      message = super.beforeForward(message);
-
-      return message;
+      messageCopy = super.beforeForward(messageCopy);
+ 
+      return messageCopy;
    }
 
    private void setupNotificationConsumer() throws Exception

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -25,6 +25,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * 
@@ -89,6 +90,14 @@
       super(other);
    }
 
+   /*
+    * Copy constructor
+    */
+   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+   {
+      super(other, properties);
+   }
+
    public boolean isServerMessage()
    {
       return true;
@@ -193,6 +202,7 @@
 
    public ServerMessage copy()
    {
+      // This is a simple copy, used only to avoid changing original properties
       return new ServerMessageImpl(this);
    }
 

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -38,7 +38,6 @@
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -71,6 +70,16 @@
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
+   public ClusterTestBase()
+   {
+      super();
+   }
+
+   public ClusterTestBase(String name)
+   {
+      super(name);
+   }
+
    private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
                                        TransportConstants.DEFAULT_PORT + 1,
                                        TransportConstants.DEFAULT_PORT + 2,
@@ -690,6 +699,11 @@
                Assert.assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
             }
 
+            if (isLargeMessage())
+            {
+               validateLargeMessage(message);
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -748,6 +762,11 @@
                   Assert.fail("consumer " + i + " did not receive all messages");
                }
 
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                if (ack)
                {
                   message.acknowledge();
@@ -792,10 +811,7 @@
 
             if (isLargeMessage())
             {
-               for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
-               {
-                  assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
-               }
+               validateLargeMessage(message);
             }
 
             if (ack)
@@ -891,6 +907,11 @@
 
             if (message != null)
             {
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                ClusterTestBase.log.info("check receive Consumer " + consumerID +
                                         " received message " +
                                         message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -925,6 +946,10 @@
          Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
                              i,
                              message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+         if (isLargeMessage())
+         {
+            validateLargeMessage(message);
+         }
 
          count++;
 
@@ -981,6 +1006,10 @@
          ClientMessage msg = holder.consumer.receive(10000);
 
          Assert.assertNotNull(msg);
+         if (isLargeMessage())
+         {
+            validateLargeMessage(msg);
+         }
 
          int count = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
 
@@ -1024,6 +1053,10 @@
             ClientMessage msg = holder.consumer.consumer.receive(10000);
 
             Assert.assertNotNull(msg);
+            if (isLargeMessage())
+            {
+               validateLargeMessage(msg);
+            }
 
             int p = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
 
@@ -1078,6 +1111,11 @@
             {
                int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
 
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
                Assert.assertFalse(counts.contains(count));
@@ -1171,6 +1209,12 @@
          message = consumer.consumer.receive(500);
          if (message != null)
          {
+
+            if (isLargeMessage())
+            {
+               validateLargeMessage(message);
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -1356,7 +1400,6 @@
       configuration.setJournalFileSize(100 * 1024);
       configuration.setJournalType(getDefaultJournalType());
       configuration.setSharedStore(sharedStorage);
-      configuration.setThreadPoolMaxSize(10);
       if (sharedStorage)
       {
          // Shared storage will share the node between the backup and live node
@@ -1904,6 +1947,17 @@
       }
    }
 
+   /**
+    * @param message
+    */
+   private void validateLargeMessage(ClientMessage message)
+   {
+      for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+      {
+         assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+      }
+   }
+
    protected boolean isFileStorage()
    {
       return true;

Added: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java	                        (rev 0)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+   public LargeMessageRedistributionTest()
+   {
+      super();
+   }
+
+   public LargeMessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
+   protected boolean isLargeMessage()
+   {
+      return true;
+   }
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-01 22:50:36 UTC (rev 11281)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-01 22:59:45 UTC (rev 11282)
@@ -25,9 +25,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 
@@ -44,6 +41,19 @@
 {
    private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
 
+   public MessageRedistributionTest()
+   {
+      super();
+   }
+
+   /**
+    * @param name
+    */
+   public MessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -118,96 +128,6 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
-   // https://issues.jboss.org/browse/HORNETQ-654
-   public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
-   {
-      setupCluster(false);
-
-      MessageRedistributionTest.log.info("Doing test");
-
-      startServers(0, 1, 2);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, true);
-      createQueue(1, "queues.testaddress", "queue0", null, true);
-      createQueue(2, "queues.testaddress", "queue0", null, true);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-
-      send(0, "queues.testaddress", 20, true, null);
-
-      getReceivedOrder(0, true);
-      int[] ids1 = getReceivedOrder(1, false);
-      getReceivedOrder(2, true);
-
-      for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
-      {
-         ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
-         for (MessageFlowRecord record : impl.getRecords().values())
-         {
-            if (record.getBridge() != null)
-            {
-               System.out.println("stop record bridge");
-               record.getBridge().stop();
-            }
-         }
-      }
-
-      removeConsumer(1);
-      
-      // Need to wait some time as we need to handle all redistributions before we stop the servers
-      Thread.sleep(5000);
-
-      for (int i = 0; i <= 2; i++)
-      {
-         servers[i].stop();
-         servers[i] = null;
-      }
-      
-      setupServers();
-      
-      setupCluster(false);
-      
-      startServers(0, 1, 2);
-      
-      for (int i = 0 ; i <= 2; i++)
-      {
-         consumers[i] = null;
-         sfs[i] = null;
-      }
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 1, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
-
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
-      MessageRedistributionTest.log.info("Test done");
-   }
-
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);



More information about the hornetq-commits mailing list