Author: clebert.suconic(a)jboss.com
Date: 2011-03-03 18:15:11 -0500 (Thu, 03 Mar 2011)
New Revision: 10286
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - Making sure there are no issues with
ROUTE_TO information on the cluster communication
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -583,6 +583,15 @@
return;
}
+
+ if (message.hasInternalProperties())
+ {
+ // We need to perform some cleanup on internal properties,
+ // but we don't do it every time, otherwise it wouldn't be optimal
+ cleanupInternalPropertiesBeforeRouting(message);
+ }
+
+
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings != null)
@@ -792,6 +801,23 @@
// Private -----------------------------------------------------------------
+ /**
+ * @param message
+ */
+ protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+ {
+ for (SimpleString name : message.getPropertyNames())
+ {
+ // We use properties to establish routing context on clustering.
+ // However if the client resends the message after receiving, it needs to be
removed
+ if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) &&
!name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
+ {
+ message.removeProperty(name);
+ }
+ }
+ }
+
+
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java 2011-03-03
18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -54,6 +54,9 @@
void setPagingStore(PagingStore store);
PagingStore getPagingStore();
+
+ // Is there any _HQ_ property being used
+ boolean hasInternalProperties();
boolean storeIsPaging();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -319,26 +319,16 @@
/* Hook for processing message before forwarding */
protected ServerMessage beforeForward(ServerMessage message)
{
- if (useDuplicateDetection &&
!message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
- {
- // If we are using duplicate detection and there's not already a duplicate
detection header, then
- // we add a header composed of the persistent node id and the message id, which
makes it globally unique
- // between restarts.
- // If you use a cluster connection then a guid based duplicate id will be used
since it is added *before*
- // the
- // message goes into the store and forward queue.
- // But with this technique it also works when the messages don't already
have such a header in them in the
- // queue.
- byte[] bytes = new byte[24];
+ // We keep our own DuplicateID for the Bridge, so bouncing back and forths will
work fine
+ byte[] bytes = new byte[24];
- ByteBuffer bb = ByteBuffer.wrap(bytes);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
- bb.put(nodeUUID.asBytes());
+ bb.put(nodeUUID.asBytes());
- bb.putLong(message.getMessageID());
+ bb.putLong(message.getMessageID());
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
- }
+ message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
if (transformer != null)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -180,26 +180,7 @@
public void route(final ServerMessage message, final RoutingContext context)
{
- byte[] ids = message.getBytesProperty(idsHeaderName);
-
- if (ids == null)
- {
- ids = new byte[8];
- }
- else
- {
- byte[] newIds = new byte[ids.length + 8];
-
- System.arraycopy(ids, 0, newIds, 8, ids.length);
-
- ids = newIds;
- }
-
- ByteBuffer buff = ByteBuffer.wrap(ids);
-
- buff.putLong(remoteQueueID);
-
- message.putBytesProperty(idsHeaderName, ids);
+ addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
@@ -298,4 +279,35 @@
storeAndForwardQueue.close();
}
+
+ /**
+ * This will add routing information to the message.
+ * This will be later processed during the delivery between the nodes. Because of that
this has to be persisted as a property on the message.
+ * @param message
+ */
+ private void addRouteContextToMessage(final ServerMessage message)
+ {
+ byte[] ids = message.getBytesProperty(idsHeaderName);
+
+ if (ids == null)
+ {
+ ids = new byte[8];
+ }
+ else
+ {
+ byte[] newIds = new byte[ids.length + 8];
+
+ System.arraycopy(ids, 0, newIds, 8, ids.length);
+
+ ids = newIds;
+ }
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ buff.putLong(remoteQueueID);
+
+ message.putBytesProperty(idsHeaderName, ids);
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -106,6 +106,12 @@
return ref;
}
+
+ public boolean hasInternalProperties()
+ {
+ return properties.hasInternalProperties();
+ }
+
public synchronized int incrementRefCount() throws Exception
{
refCount++;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-03-03
18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -49,10 +49,14 @@
public class TypedProperties
{
private static final Logger log = Logger.getLogger(TypedProperties.class);
+
+ private static final SimpleString HQ_PROPNAME = new SimpleString("_HQ_");
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
+
+ private boolean internalProperties;
public TypedProperties()
{
@@ -72,6 +76,11 @@
properties = other.properties == null ? null : new HashMap<SimpleString,
PropertyValue>(other.properties);
size = other.size;
}
+
+ public boolean hasInternalProperties()
+ {
+ return internalProperties;
+ }
public void putBooleanProperty(final SimpleString key, final boolean value)
{
@@ -610,6 +619,11 @@
private synchronized void doPutValue(final SimpleString key, final PropertyValue
value)
{
+ if (key.startsWith(HQ_PROPNAME))
+ {
+ internalProperties = true;
+ }
+
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -759,13 +759,8 @@
Assert.assertNotNull("consumer " + consumerID + " did not
receive message " + j, message);
}
-// Set<SimpleString> names = message.getPropertyNames();
-// for (SimpleString name : names)
-// {
-// assertFalse("Property " + name + " still defined what
could be dangerous on resending the message",
name.toString().startsWith("_HQ_ROUTE_TO"));
-// }
-//
+
if (ack)
{
message.acknowledge();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-03-03
18:09:53 UTC (rev 10285)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-03-03
23:15:11 UTC (rev 10286)
@@ -1010,6 +1010,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#hasInternalProperties()
+ */
+ public boolean hasInternalProperties()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
class FakeFilter implements Filter