[hornetq-commits] JBoss hornetq SVN: r10286 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 3 18:15:11 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-03 18:15:11 -0500 (Thu, 03 Mar 2011)
New Revision: 10286

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - Making sure there are no issues with ROUTE_TO information on the cluster communication

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -583,6 +583,15 @@
          return;
       }
 
+      
+      if (message.hasInternalProperties())
+      {
+         // We need to perform some cleanup on internal properties,
+         // but we don't do it every time, otherwise it wouldn't be optimal
+         cleanupInternalPropertiesBeforeRouting(message);
+      }
+
+
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
 
       if (bindings != null)
@@ -792,6 +801,23 @@
 
    // Private -----------------------------------------------------------------
 
+   /**
+    * @param message
+    */
+   protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+   {
+      for (SimpleString name : message.getPropertyNames())
+      {
+         // We use properties to establish routing context on clustering.
+         // However if the client resends the message after receiving, it needs to be removed
+         if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
+         {
+            message.removeProperty(name);
+         }
+      }
+   }
+
+
    private void setPagingStore(final ServerMessage message) throws Exception
    {
       PagingStore store = pagingManager.getPageStore(message.getAddress());

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -54,6 +54,9 @@
    void setPagingStore(PagingStore store);
 
    PagingStore getPagingStore();
+   
+   // Is there any _HQ_ property being used
+   boolean hasInternalProperties();
 
    boolean storeIsPaging();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -319,26 +319,16 @@
    /* Hook for processing message before forwarding */
    protected ServerMessage beforeForward(ServerMessage message)
    {
-      if (useDuplicateDetection && !message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
-      {
-         // If we are using duplicate detection and there's not already a duplicate detection header, then
-         // we add a header composed of the persistent node id and the message id, which makes it globally unique
-         // between restarts.
-         // If you use a cluster connection then a guid based duplicate id will be used since it is added *before*
-         // the
-         // message goes into the store and forward queue.
-         // But with this technique it also works when the messages don't already have such a header in them in the
-         // queue.
-         byte[] bytes = new byte[24];
+      // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
+      byte[] bytes = new byte[24];
 
-         ByteBuffer bb = ByteBuffer.wrap(bytes);
+      ByteBuffer bb = ByteBuffer.wrap(bytes);
 
-         bb.put(nodeUUID.asBytes());
+      bb.put(nodeUUID.asBytes());
 
-         bb.putLong(message.getMessageID());
+      bb.putLong(message.getMessageID());
 
-         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
-      }
+      message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
 
       if (transformer != null)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -180,26 +180,7 @@
 
    public void route(final ServerMessage message, final RoutingContext context)
    {
-      byte[] ids = message.getBytesProperty(idsHeaderName);
-
-      if (ids == null)
-      {
-         ids = new byte[8];
-      }
-      else
-      {
-         byte[] newIds = new byte[ids.length + 8];
-
-         System.arraycopy(ids, 0, newIds, 8, ids.length);
-
-         ids = newIds;
-      }
-
-      ByteBuffer buff = ByteBuffer.wrap(ids);
-
-      buff.putLong(remoteQueueID);
-
-      message.putBytesProperty(idsHeaderName, ids);
+      addRouteContextToMessage(message);
          
       List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
 
@@ -298,4 +279,35 @@
       storeAndForwardQueue.close();
    }
 
+   
+   /**
+    * This will add routing information to the message.
+    * This will be later processed during the delivery between the nodes. Because of that this has to be persisted as a property on the message.
+    * @param message
+    */
+   private void addRouteContextToMessage(final ServerMessage message)
+   {
+      byte[] ids = message.getBytesProperty(idsHeaderName);
+
+      if (ids == null)
+      {
+         ids = new byte[8];
+      }
+      else
+      {
+         byte[] newIds = new byte[ids.length + 8];
+
+         System.arraycopy(ids, 0, newIds, 8, ids.length);
+
+         ids = newIds;
+      }
+
+      ByteBuffer buff = ByteBuffer.wrap(ids);
+
+      buff.putLong(remoteQueueID);
+
+      message.putBytesProperty(idsHeaderName, ids);
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -106,6 +106,12 @@
       return ref;
    }
 
+   
+   public boolean hasInternalProperties()
+   {
+      return properties.hasInternalProperties();
+   }
+   
    public synchronized int incrementRefCount() throws Exception
    {
       refCount++;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -49,10 +49,14 @@
 public class TypedProperties
 {
    private static final Logger log = Logger.getLogger(TypedProperties.class);
+   
+   private static final SimpleString HQ_PROPNAME = new SimpleString("_HQ_");
 
    private Map<SimpleString, PropertyValue> properties;
 
    private volatile int size;
+   
+   private boolean internalProperties;
 
    public TypedProperties()
    {
@@ -72,6 +76,11 @@
       properties = other.properties == null ? null : new HashMap<SimpleString, PropertyValue>(other.properties);
       size = other.size;
    }
+   
+   public boolean hasInternalProperties()
+   {
+      return internalProperties;
+   }
 
    public void putBooleanProperty(final SimpleString key, final boolean value)
    {
@@ -610,6 +619,11 @@
 
    private synchronized void doPutValue(final SimpleString key, final PropertyValue value)
    {
+      if (key.startsWith(HQ_PROPNAME))
+      {
+         internalProperties = true;
+      }
+      
       PropertyValue oldValue = properties.put(key, value);
       if (oldValue != null)
       {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -759,13 +759,8 @@
                Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
             }
 
-//            Set<SimpleString> names = message.getPropertyNames();
-//            for (SimpleString name : names)
-//            {
-//               assertFalse("Property " + name + " still defined what could be dangerous on resending the message", name.toString().startsWith("_HQ_ROUTE_TO"));
-//            }
-//
 
+
             if (ack)
             {
                message.acknowledge();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-03-03 23:15:11 UTC (rev 10286)
@@ -1010,6 +1010,15 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.ServerMessage#hasInternalProperties()
+       */
+      public boolean hasInternalProperties()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list