Author: timfox
Date: 2010-01-18 06:27:20 -0500 (Mon, 18 Jan 2010)
New Revision: 8804
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-271
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-14 12:06:43 UTC
(rev 8803)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-18 11:27:20 UTC
(rev 8804)
@@ -53,8 +53,6 @@
public static final SimpleString HDR_ROUTE_TO_IDS = new
SimpleString("_HQ_ROUTE_TO");
- public static final SimpleString HDR_FROM_CLUSTER = new
SimpleString("_HQ_FROM_CLUSTER");
-
protected long messageID;
protected SimpleString address;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-01-14 12:06:43
UTC (rev 8803)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-01-18 11:27:20
UTC (rev 8804)
@@ -238,7 +238,9 @@
if (!routed)
{
- if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+ //TODO this is a little inefficient since we do the lookup once to see if the
property
+ //is there, then do it again to remove the actual property
+ if (message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS))
{
routeFromCluster(message, context);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-14
12:06:43 UTC (rev 8803)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-18
11:27:20 UTC (rev 8804)
@@ -427,8 +427,6 @@
}
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
-
- message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
}
if (useDuplicateDetection &&
!message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-01-18
11:27:20 UTC (rev 8804)
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+
+public class ClusterHeadersRemovedTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterHeadersRemovedTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1);
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ public void testHeadersRemoved() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, 1,
isNetty());
+ startServers(1, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ ClientSessionFactory sf = sfs[0];
+
+ ClientSession session0 = sf.createSession(false, true, true);
+
+ try
+ {
+ ClientProducer producer =
session0.createProducer("queues.testaddress");
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ producer.send(message);
+ }
+ }
+ finally
+ {
+ session0.close();
+ }
+
+ ClientConsumer consumer = super.getConsumer(1);
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+
+ assertNotNull(message);
+
+ assertFalse(message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS));
+ }
+ }
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-01-14
12:06:43 UTC (rev 8803)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-01-18
11:27:20 UTC (rev 8804)
@@ -28,7 +28,12 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -144,6 +149,11 @@
protected ClientSessionFactory[] sfs;
+ protected ClientConsumer getConsumer(final int node)
+ {
+ return consumers[node].consumer;
+ }
+
protected void waitForMessages(final int node, final String address, final int count)
throws Exception
{
HornetQServer server = servers[node];