[hornetq-commits] JBoss hornetq SVN: r11315 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 9 17:41:06 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-09 17:41:06 -0400 (Fri, 09 Sep 2011)
New Revision: 11315

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
HORNETQ-753 and JBPAPP-7115

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -143,6 +143,14 @@
     */
    protected MessageImpl(final MessageImpl other)
    {
+      this(other, other.getProperties());
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other, TypedProperties properties)
+   {
       messageID = other.getMessageID();
       userID = other.getUserID();
       address = other.getAddress();
@@ -151,7 +159,7 @@
       expiration = other.getExpiration();
       timestamp = other.getTimestamp();
       priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
+      this.properties = new TypedProperties(properties);
 
       // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * A LargeServerMessageImpl
@@ -70,12 +71,13 @@
 
    /**
     * Copy constructor
+    * @param properties
     * @param copy
     * @param fileCopy
     */
-   private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+   private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
-      super(copy);
+      super(copy, properties);
       linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
@@ -281,8 +283,28 @@
          this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
       }
    }
+   
+   @Override
+   public synchronized ServerMessage copy()
+   {
+      long idToUse = messageID;
 
+      if (linkMessage != null)
+      {
+         idToUse = linkMessage.getMessageID();
+      }
 
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                               : (LargeServerMessageImpl)linkMessage,
+                                                            properties,
+                                                            newfile,
+                                                            messageID);
+      return newMessage;
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
@@ -301,6 +323,7 @@
    
          ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
                                                                                   : (LargeServerMessageImpl)linkMessage,
+                                                               properties,
                                                                newfile,
                                                                newID);
          return newMessage;
@@ -317,7 +340,7 @@
             
             file.copyTo(newFile);
             
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
             
             newMessage.linkMessage = null;
             
@@ -341,9 +364,9 @@
    @Override
    public String toString()
    {
-      return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + 
+      return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + 
       ",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
-      ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
+      ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -61,6 +61,7 @@
    {
       super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
+      this.message = message;
    }
 
    /**

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -455,7 +455,7 @@
    // Consumer implementation ---------------------------------------
 
    /* Hook for processing message before forwarding */
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       if (useDuplicateDetection)
       {
@@ -467,10 +467,20 @@
 
       if (transformer != null)
       {
-         message = transformer.transform(message);
+         final ServerMessage transformedMessage = transformer.transform(message);
+         if (transformedMessage != message)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+            }
+         }
+         return transformedMessage;
       }
-
-      return message;
+      else
+      {
+         return message;
+      }
    }
 
    /**
@@ -535,6 +545,12 @@
          // that this will throw a disconnect, we need to remove the message
          // from the acks so it will get resent, duplicate detection will cope
          // with any messages resent
+         
+         if (log.isTraceEnabled())
+         {
+            log.trace("XXX going to send message " + message);
+         }
+         
          try
          {
             producer.send(dest, message);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -169,34 +169,50 @@
    }
 
    @Override
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       // We make a copy of the message, then we strip out the unwanted routing id headers and leave
       // only
       // the one pertinent for the address node - this is important since different queues on different
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
-      message = message.copy();
+      ServerMessage messageCopy = message.copy();
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
+      }
 
       // TODO - we can optimise this
 
-      Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+      Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getBytesProperty(idsHeaderName);
+      
+      if (queueIds == null)
+      {
+         // Sanity check only
+         log.warn("no queue IDs defined!,  originalMessage  = " + message +
+                  ", copiedMessage = " +
+                  messageCopy +
+                  ", props=" +
+                  idsHeaderName);
+         throw new IllegalStateException("no queueIDs defined");
+      }
 
       for (SimpleString propName : propNames)
       {
          if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
          {
-            message.removeProperty(propName);
+            messageCopy.removeProperty(propName);
          }
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
 
-      message = super.beforeForward(message);
-
-      return message;
+      messageCopy = super.beforeForward(messageCopy);
+ 
+      return messageCopy;
    }
 
    private void setupNotificationConsumer() throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -25,6 +25,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * 
@@ -89,6 +90,14 @@
       super(other);
    }
 
+   /*
+    * Copy constructor
+    */
+   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+   {
+      super(other, properties);
+   }
+
    public boolean isServerMessage()
    {
       return true;
@@ -193,6 +202,7 @@
 
    public ServerMessage copy()
    {
+      // This is a simple copy, used only to avoid changing original properties
       return new ServerMessageImpl(this);
    }
 
@@ -275,7 +285,7 @@
    {
       return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + 
           ",expiration=" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : 0) + 
-          ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
+          ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
    // FIXME - this is stuff that is only used in large messages

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -1073,6 +1073,11 @@
 
       LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
 
+      if (log.isTraceEnabled())
+      {
+         log.trace("sendLarge::" + largeMsg);
+      }
+      
       if (currentLargeMessage != null)
       {
          ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -74,7 +74,17 @@
 public abstract class ClusterTestBase extends ServiceTestBase
 {
    private final Logger log = Logger.getLogger(this.getClass());
+   public ClusterTestBase()
+   {
+      super();
+   }
 
+   public ClusterTestBase(String name)
+   {
+      super(name);
+   }
+
+
    private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
                                        TransportConstants.DEFAULT_PORT + 1,
                                        TransportConstants.DEFAULT_PORT + 2,
@@ -87,7 +97,18 @@
                                        TransportConstants.DEFAULT_PORT + 9, };
 
    private static final long WAIT_TIMEOUT = 10000;
+  
+   protected int getLargeMessageSize()
+   {
+      return 500;
+   }
    
+   protected boolean isLargeMessage()
+   {
+      return false;
+   }
+   
+
    private static final long TIMEOUT_START_SERVER = 10;
 
    @Override
@@ -635,6 +656,11 @@
             }
 
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+            
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
 
             producer.send(message);
 
@@ -686,9 +712,15 @@
          for (int i = msgStart; i < msgEnd; i++)
          {
             ClientMessage message = session.createMessage(durable);
+            
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
 
             message.putStringProperty(key, val);
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+            
             producer.send(message);
          }
       }
@@ -881,6 +913,12 @@
 
             log.info("msg on ClusterTestBase = " + message);
 
+            
+            if (isLargeMessage())
+            {
+               checkMessageBody(message);
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -1180,7 +1218,10 @@
             if (message != null)
             {
                int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+               
+               checkMessageBody(message);
 
+
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
                Assert.assertFalse(counts.contains(count));
@@ -1241,6 +1282,20 @@
 
    }
 
+   /**
+    * @param message
+    */
+   private void checkMessageBody(ClientMessage message)
+   {
+      if (isLargeMessage())
+      {
+         for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+         {
+            assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+         }
+      }
+   }
+
    protected void verifyReceiveRoundRobinInSomeOrderNoAck(final int numMessages, final int... consumerIDs) throws Exception
    {
       if (numMessages < consumerIDs.length)
@@ -1274,6 +1329,12 @@
          message = consumer.consumer.receive(500);
          if (message != null)
          {
+            
+            if (isLargeMessage())
+            {
+               checkMessageBody(message);
+            }
+            
             if (ack)
             {
                message.acknowledge();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-09 21:41:06 UTC (rev 11315)
@@ -25,9 +25,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 
@@ -44,6 +41,19 @@
 {
    private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
 
+   public MessageRedistributionTest()
+   {
+      super();
+   }
+
+   /**
+    * @param name
+    */
+   public MessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -113,101 +123,11 @@
 
       removeConsumer(1);
 
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0,  2);
 
       MessageRedistributionTest.log.info("Test done");
    }
 
-   // https://issues.jboss.org/browse/HORNETQ-654
-   public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
-   {
-      setupCluster(false);
-
-      MessageRedistributionTest.log.info("Doing test");
-
-      startServers(0, 1, 2);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, true);
-      createQueue(1, "queues.testaddress", "queue0", null, true);
-      createQueue(2, "queues.testaddress", "queue0", null, true);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-
-      send(0, "queues.testaddress", 20, true, null);
-
-      getReceivedOrder(0, true);
-      int[] ids1 = getReceivedOrder(1, false);
-      getReceivedOrder(2, true);
-
-      for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
-      {
-         ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
-         for (MessageFlowRecord record : impl.getRecords().values())
-         {
-            if (record.getBridge() != null)
-            {
-               System.out.println("stop record bridge");
-               record.getBridge().stop();
-            }
-         }
-      }
-
-      removeConsumer(1);
-      
-      // Need to wait some time as we need to handle all redistributions before we stop the servers
-      Thread.sleep(1000);
-
-      for (int i = 0; i <= 2; i++)
-      {
-         servers[i].stop();
-         servers[i] = null;
-      }
-      
-      setupServers();
-      
-      setupCluster(false);
-      
-      startServers(0, 1, 2);
-      
-      for (int i = 0 ; i <= 2; i++)
-      {
-         consumers[i] = null;
-         sfs[i] = null;
-      }
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 1, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
-
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
-      MessageRedistributionTest.log.info("Test done");
-   }
-
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);



More information about the hornetq-commits mailing list