JBoss hornetq SVN: r8291 - trunk/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-16 10:07:00 -0500 (Mon, 16 Nov 2009)
New Revision: 8291
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
removed erroneous case clauses from MessageFlowRecordImpl onMessage handler
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-16 13:29:18 UTC (rev 8290)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-16 15:07:00 UTC (rev 8291)
@@ -554,8 +554,6 @@
break;
}
- case SECURITY_AUTHENTICATION_VIOLATION:
- case SECURITY_PERMISSION_VIOLATION:
case PROPOSAL:
doProposalReceived(message);
break;
15 years, 1 month
JBoss hornetq SVN: r8290 - trunk.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-16 08:29:18 -0500 (Mon, 16 Nov 2009)
New Revision: 8290
Modified:
trunk/build-thirdparty.xml
Log:
added maven debug flag
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2009-11-16 12:13:10 UTC (rev 8289)
+++ trunk/build-thirdparty.xml 2009-11-16 13:29:18 UTC (rev 8290)
@@ -93,7 +93,7 @@
<target name="maven-install" description="Run the install goal against the maven build"
depends="maven-init">
- <property name="maven.opts" value=""/>
+ <property name="maven.opts" value="--debug"/>
<property name="maven.install.opts" value="-Dintegrated-build ${maven.opts}"/>
<maven basedir="${basedir}"
15 years, 1 month
JBoss hornetq SVN: r8289 - trunk/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-16 07:13:10 -0500 (Mon, 16 Nov 2009)
New Revision: 8289
Added:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
Log:
HORNETQ-218: Incorrect order when persistent and non-persistent messages are sent over replication
* added ReplicationOrderTest
Added: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-16 12:13:10 UTC (rev 8289)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.replication;
+
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.tests.integration.cluster.failover.FailoverTestBase;
+
+/**
+ * A ReplicationOrderTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class ReplicationOrderTest extends FailoverTestBase
+{
+
+ public static final int NUM = 100;
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("<<<<<< " + i + " >>>>>>>");
+ testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup() throws Exception
+ {
+ String address = randomString();
+ String queue = randomString();
+
+ ClientSessionFactory csf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
+ csf.setBlockOnNonPersistentSend(false);
+ csf.setBlockOnPersistentSend(false);
+ ClientSession session = csf.createSession(true, true);
+ session.createQueue(address, queue, true);
+ ClientProducer producer = session.createProducer(address);
+ for (int i = 0; i < NUM; i++)
+ {
+ boolean durable = (i % 2 == 0);
+ ClientMessage msg = session.createClientMessage(durable);
+ msg.putIntProperty("counter", i);
+ producer.send(msg);
+ }
+ session.close();
+
+ csf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
+ session = csf.createSession(true, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < NUM; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("counter").intValue());
+ }
+
+ consumer.close();
+ session.deleteQueue(queue);
+
+ session.close();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (!live)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ }
+ return new TransportConfiguration(InVMAcceptorFactory.class.getName(), server1Params);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (!live)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ }
+ return new TransportConfiguration(InVMConnectorFactory.class.getName(), server1Params);
+ }
+
+}
15 years, 1 month
JBoss hornetq SVN: r8288 - branches.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-16 05:21:48 -0500 (Mon, 16 Nov 2009)
New Revision: 8288
Added:
branches/20-optimisation/
Log:
branch for 2.0 optimisation work
Copied: branches/20-optimisation (from rev 8287, trunk)
15 years, 1 month
JBoss hornetq SVN: r8287 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-15 07:09:32 -0500 (Sun, 15 Nov 2009)
New Revision: 8287
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
Removed:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
Log:
corrected typo in class name
Copied: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java (from rev 8286, trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2009-11-15 12:09:32 UTC (rev 8287)
@@ -0,0 +1,309 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A SymmetricFailoverTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedDistributionTest extends ClusterTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRedistribution() throws Exception
+ {
+ setupSessionFactory(1, 0, true, true);
+ setupSessionFactory(3, 2, true, true);
+
+ ClientSession sessionOne = sfs[1].createSession(true, true);
+
+ ClientSession sessionThree = sfs[3].createSession(false, false);
+
+ sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+ sessionThree.start();
+
+ waitForBindings(3, "test.SomeAddress", 1, 1, true);
+ waitForBindings(1, "test.SomeAddress", 1, 1, false);
+ try
+ {
+ ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = sessionOne.createClientMessage(true);
+
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+
+ msg.putIntProperty(new SimpleString("key"), i);
+
+ producer.send(msg);
+ }
+
+ sessionOne.commit();
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = msg.getIntProperty("key");
+
+ assertEquals(i, received);
+
+ msg.acknowledge();
+ }
+
+ sessionThree.commit();
+
+ // consThree.close();
+
+ // TODO: Remove this sleep: If a node fail,
+ // Redistribution may loose messages between the nodes.
+ Thread.sleep(500);
+
+ fail(sessionThree);
+
+ // sessionThree.close();
+ //
+ // setupSessionFactory(2, -1, true);
+ //
+ // sessionThree = sfs[2].createSession(true, true);
+ //
+ // sessionThree.start();
+
+ // consThree = sessionThree.createConsumer(ADDRESS);
+
+ for (int i = 50; i < 100; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
+
+ assertEquals(i, received);
+
+ msg.acknowledge();
+ }
+
+ assertNull(consThree.receiveImmediate());
+
+ sessionThree.commit();
+
+ sessionOne.start();
+
+ ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+ assertNull(consOne.receiveImmediate());
+
+ }
+ finally
+ {
+ sessionOne.close();
+ sessionThree.close();
+ }
+ }
+
+ public void testSimpleRedistribution() throws Exception
+ {
+ setupSessionFactory(1, 0, true, true);
+ setupSessionFactory(3, 2, true, true);
+
+ ClientSession sessionOne = sfs[1].createSession(true, true);
+
+ ClientSession sessionThree = sfs[3].createSession(false, false);
+
+ sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+ sessionThree.start();
+
+ waitForBindings(3, "test.SomeAddress", 1, 1, true);
+ waitForBindings(1, "test.SomeAddress", 1, 1, false);
+
+ try
+ {
+ ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = sessionOne.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ msg.putIntProperty(new SimpleString("key"), i);
+ producer.send(msg);
+ }
+
+ sessionOne.commit();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = msg.getIntProperty("key");
+
+ if (i != received)
+ {
+ // Shouldn't this be a failure?
+ System.out.println(i + "!=" + received);
+ }
+ msg.acknowledge();
+ }
+
+ sessionThree.commit();
+
+ sessionOne.start();
+
+ ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+ assertNull(consOne.receiveImmediate());
+
+ }
+ finally
+ {
+ sessionOne.close();
+ sessionThree.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ /**
+ * @param session
+ * @param latch
+ * @throws InterruptedException
+ */
+ private void fail(final ClientSession session) throws InterruptedException
+ {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.SessionFailureListener#beforeReconnect(org.hornetq.core.exception.HornetQException)
+ */
+ public void beforeReconnect(final HornetQException exception)
+ {
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(1, true, isShared(), true, false, -1);
+ setupServer(2, true, isShared(), true, true, -1);
+ setupServer(3, true, isShared(), true, true, 2);
+
+ setupClusterConnectionWithBackups("test", "test", false, 1, true, 1, new int[] { 3 }, new int[] { 2 });
+
+ AddressSettings as = new AddressSettings();
+ as.setRedistributionDelay(0);
+
+ getServer(1).getAddressSettingsRepository().addMatch("test.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+
+ servers[2].start();
+ servers[1].start();
+ servers[3].start();
+ }
+
+ protected boolean isShared()
+ {
+ return false;
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ servers[2].stop();
+ servers[1].stop();
+ servers[3].stop();
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-15 10:35:38 UTC (rev 8286)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-15 12:09:32 UTC (rev 8287)
@@ -1,309 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
-import org.hornetq.utils.SimpleString;
-
-/**
- * A SymmetricFailoverTest
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicatedDistrubtionTest extends ClusterTestBase
-{
-
- // Constants -----------------------------------------------------
-
- static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testRedistribution() throws Exception
- {
- setupSessionFactory(1, 0, true, true);
- setupSessionFactory(3, 2, true, true);
-
- ClientSession sessionOne = sfs[1].createSession(true, true);
-
- ClientSession sessionThree = sfs[3].createSession(false, false);
-
- sessionOne.createQueue(ADDRESS, ADDRESS, true);
-
- sessionThree.createQueue(ADDRESS, ADDRESS, true);
-
- ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
-
- sessionThree.start();
-
- waitForBindings(3, "test.SomeAddress", 1, 1, true);
- waitForBindings(1, "test.SomeAddress", 1, 1, false);
- try
- {
- ClientProducer producer = sessionOne.createProducer(ADDRESS);
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage msg = sessionOne.createClientMessage(true);
-
- msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
-
- msg.putIntProperty(new SimpleString("key"), i);
-
- producer.send(msg);
- }
-
- sessionOne.commit();
-
- for (int i = 0; i < 50; i++)
- {
- ClientMessage msg = consThree.receive(15000);
-
- assertNotNull(msg);
-
- System.out.println(i + " msg = " + msg);
-
- int received = msg.getIntProperty("key");
-
- assertEquals(i, received);
-
- msg.acknowledge();
- }
-
- sessionThree.commit();
-
- // consThree.close();
-
- // TODO: Remove this sleep: If a node fail,
- // Redistribution may loose messages between the nodes.
- Thread.sleep(500);
-
- fail(sessionThree);
-
- // sessionThree.close();
- //
- // setupSessionFactory(2, -1, true);
- //
- // sessionThree = sfs[2].createSession(true, true);
- //
- // sessionThree.start();
-
- // consThree = sessionThree.createConsumer(ADDRESS);
-
- for (int i = 50; i < 100; i++)
- {
- ClientMessage msg = consThree.receive(15000);
-
- assertNotNull(msg);
-
- System.out.println(i + " msg = " + msg);
-
- int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
-
- assertEquals(i, received);
-
- msg.acknowledge();
- }
-
- assertNull(consThree.receiveImmediate());
-
- sessionThree.commit();
-
- sessionOne.start();
-
- ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
-
- assertNull(consOne.receiveImmediate());
-
- }
- finally
- {
- sessionOne.close();
- sessionThree.close();
- }
- }
-
- public void testSimpleRedistribution() throws Exception
- {
- setupSessionFactory(1, 0, true, true);
- setupSessionFactory(3, 2, true, true);
-
- ClientSession sessionOne = sfs[1].createSession(true, true);
-
- ClientSession sessionThree = sfs[3].createSession(false, false);
-
- sessionOne.createQueue(ADDRESS, ADDRESS, true);
-
- sessionThree.createQueue(ADDRESS, ADDRESS, true);
-
- ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
-
- sessionThree.start();
-
- waitForBindings(3, "test.SomeAddress", 1, 1, true);
- waitForBindings(1, "test.SomeAddress", 1, 1, false);
-
- try
- {
- ClientProducer producer = sessionOne.createProducer(ADDRESS);
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage msg = sessionOne.createClientMessage(true);
- msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
- msg.putIntProperty(new SimpleString("key"), i);
- producer.send(msg);
- }
-
- sessionOne.commit();
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage msg = consThree.receive(15000);
-
- assertNotNull(msg);
-
- System.out.println(i + " msg = " + msg);
-
- int received = msg.getIntProperty("key");
-
- if (i != received)
- {
- // Shouldn't this be a failure?
- System.out.println(i + "!=" + received);
- }
- msg.acknowledge();
- }
-
- sessionThree.commit();
-
- sessionOne.start();
-
- ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
-
- assertNull(consOne.receiveImmediate());
-
- }
- finally
- {
- sessionOne.close();
- sessionThree.close();
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- /**
- * @param session
- * @param latch
- * @throws InterruptedException
- */
- private void fail(final ClientSession session) throws InterruptedException
- {
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.client.SessionFailureListener#beforeReconnect(org.hornetq.core.exception.HornetQException)
- */
- public void beforeReconnect(final HornetQException exception)
- {
- }
- }
-
- session.addFailureListener(new MyListener());
-
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- assertTrue(ok);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- setupServer(1, true, isShared(), true, false, -1);
- setupServer(2, true, isShared(), true, true, -1);
- setupServer(3, true, isShared(), true, true, 2);
-
- setupClusterConnectionWithBackups("test", "test", false, 1, true, 1, new int[] { 3 }, new int[] { 2 });
-
- AddressSettings as = new AddressSettings();
- as.setRedistributionDelay(0);
-
- getServer(1).getAddressSettingsRepository().addMatch("test.*", as);
- getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
- getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
-
- servers[2].start();
- servers[1].start();
- servers[3].start();
- }
-
- protected boolean isShared()
- {
- return false;
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- servers[2].stop();
- servers[1].stop();
- servers[3].stop();
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java 2009-11-15 10:35:38 UTC (rev 8286)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java 2009-11-15 12:09:32 UTC (rev 8287)
@@ -20,7 +20,7 @@
*
*
*/
-public class SharedStoreDistributionTest extends ReplicatedDistrubtionTest
+public class SharedStoreDistributionTest extends ReplicatedDistributionTest
{
// Constants -----------------------------------------------------
15 years, 1 month
JBoss hornetq SVN: r8286 - trunk/tests/src/org/hornetq/tests/integration/jms/consumer.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-15 05:35:38 -0500 (Sun, 15 Nov 2009)
New Revision: 8286
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
Log:
fixed bogus tests
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-11-14 09:11:04 UTC (rev 8285)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-11-15 10:35:38 UTC (rev 8286)
@@ -137,8 +137,9 @@
Message m = consumer.receiveNoWait();
assertNull(m);
- SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ //Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this point
+ //which can cause delivering count to flip to 1
+
conn.close();
}
@@ -164,8 +165,9 @@
Message m = consumer.receiveNoWait();
assertNull(m);
- SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ //Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this point
+ //which can cause delivering count to flip to 1
+
conn.close();
}
15 years, 1 month
JBoss hornetq SVN: r8285 - trunk/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-14 04:11:04 -0500 (Sat, 14 Nov 2009)
New Revision: 8285
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
Log:
fixed timing issue in test
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2009-11-14 05:58:59 UTC (rev 8284)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2009-11-14 09:11:04 UTC (rev 8285)
@@ -62,9 +62,9 @@
// Attributes ----------------------------------------------------
private HornetQServer server;
-
+
JMSServerManagerImpl serverManager;
-
+
private InVMContext context;
// Static --------------------------------------------------------
@@ -147,13 +147,13 @@
{
return ManagementControlHelper.createJMSServerControl(mbeanServer);
}
-
+
protected void tearDown() throws Exception
{
serverManager = null;
-
+
server = null;
-
+
super.tearDown();
}
@@ -174,23 +174,18 @@
String[] connectionIDs = control.listConnectionIDs();
assertEquals(1, connectionIDs.length);
-
+
ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory, CONNECTION_TTL, PING_PERIOD);
Connection connection2 = cf2.createConnection();
assertEquals(2, control.listConnectionIDs().length);
connection.close();
- Thread.sleep(2 * CONNECTION_TTL);
+
+ waitForConnectionIDs(1, control);
- connectionIDs = control.listConnectionIDs();
- assertEquals("got " + Arrays.asList(connectionIDs), 1, connectionIDs.length);
-
- assertEquals(1, control.listConnectionIDs().length);
-
connection2.close();
- Thread.sleep(2 * CONNECTION_TTL);
-
- assertEquals(0, control.listConnectionIDs().length);
+
+ waitForConnectionIDs(0, control);
}
finally
{
@@ -198,7 +193,7 @@
{
serverManager.stop();
}
-
+
if (server != null)
{
server.stop();
@@ -206,6 +201,26 @@
}
}
+ private void waitForConnectionIDs(final int num, final JMSServerControl control) throws Exception
+ {
+ final long timeout = 10000;
+ long start = System.currentTimeMillis();
+ while (true)
+ {
+ if (control.listConnectionIDs().length == num)
+ {
+ return;
+ }
+
+ if (System.currentTimeMillis() - start > timeout)
+ {
+ throw new IllegalStateException("Timed out waiting for number of connections");
+ }
+
+ Thread.sleep(10);
+ }
+ }
+
private void doListSessions(String acceptorFactory, String connectorFactory) throws Exception
{
try
@@ -231,7 +246,7 @@
connection.close();
- Thread.sleep(2 * CONNECTION_TTL);
+ waitForConnectionIDs(0, control);
assertEquals(0, control.listConnectionIDs().length);
}
@@ -241,8 +256,7 @@
{
serverManager.stop();
}
-
-
+
if (server != null)
{
server.stop();
@@ -270,12 +284,12 @@
{
System.out.println(remoteAddress);
}
-
+
connection.close();
- Thread.sleep(2 * CONNECTION_TTL);
+ waitForConnectionIDs(0, control);
- remoteAddresses = control.listRemoteAddresses();
+ remoteAddresses = control.listRemoteAddresses();
assertEquals("got " + Arrays.asList(remoteAddresses), 0, remoteAddresses.length);
}
finally
@@ -284,8 +298,7 @@
{
serverManager.stop();
}
-
-
+
if (server != null)
{
server.stop();
@@ -327,10 +340,10 @@
boolean gotException = exceptionLatch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
assertTrue("did not received the expected JMSException", gotException);
- remoteAddresses = control.listRemoteAddresses();
+ remoteAddresses = control.listRemoteAddresses();
assertEquals("got " + Arrays.asList(remoteAddresses), 0, remoteAddresses.length);
assertEquals(0, server.getConnectionCount());
-
+
connection.close();
}
finally
@@ -339,7 +352,7 @@
{
serverManager.stop();
}
-
+
if (server != null)
{
server.stop();
@@ -383,7 +396,7 @@
assertEquals(1, control.listRemoteAddresses().length);
assertEquals(1, server.getConnectionCount());
-
+
connection.close();
}
@@ -393,7 +406,7 @@
{
serverManager.stop();
}
-
+
if (server != null)
{
server.stop();
15 years, 1 month
JBoss hornetq SVN: r8284 - in trunk/tests/src/org/hornetq/tests: util and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-14 00:58:59 -0500 (Sat, 14 Nov 2009)
New Revision: 8284
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeWithDiscoveryGroupStartTest.java
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-55 - Netty Bridge Tests
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -31,9 +31,11 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -50,31 +52,45 @@
{
private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ /**
+ * @return
+ */
+ private String getConnector()
+ {
+ if (isNetty())
+ {
+ return NettyConnectorFactory.class.getName();
+ }
+ else
+ {
+ return InVMConnectorFactory.class.getName();
+ }
+ }
+
// Fail bridge and reconnecting immediately
public void testFailoverAndReconnectImmediately() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params);
+ HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, server1Params);
+ HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createHornetQServer(2, server2Params, true);
+ HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params,
- "server0tc");
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params,
- "server1tc");
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
- TransportConfiguration server2tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server2Params,
- "server2tc");
+ TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
connectors.put(server1tc.getName(), server1tc);
@@ -182,27 +198,21 @@
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params);
+ HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, server1Params);
+ HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createHornetQServer(2, server2Params, true);
+ HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params,
- "server0tc");
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params,
- "server1tc");
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
- TransportConfiguration server2tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server2Params,
- "server2tc");
+ TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
connectors.put(server1tc.getName(), server1tc);
@@ -314,20 +324,16 @@
public void testReconnectSameNode() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params);
+ HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, server1Params);
+ HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params,
- "server0tc");
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params,
- "server1tc");
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
connectors.put(server1tc.getName(), server1tc);
@@ -430,20 +436,16 @@
public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params);
+ HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, server1Params);
+ HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params,
- "server0tc");
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params,
- "server1tc");
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
connectors.put(server1tc.getName(), server1tc);
@@ -547,18 +549,18 @@
public void testFailoverThenFailAgainAndReconnect() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params);
+ HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, server1Params);
+ HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params,
"server0tc");
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params,
"server1tc");
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -32,6 +32,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -49,14 +50,38 @@
{
private static final Logger log = Logger.getLogger(BridgeStartTest.class);
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ private String getConnector()
+ {
+ if (isNetty())
+ {
+ return NettyConnectorFactory.class.getName();
+ }
+ else
+ {
+ return "org.hornetq.core.remoting.impl.invm.InVMConnectorFactory";
+ }
+ }
+
public void testStartStop() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, true, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, true, server1Params);
+ if (isNetty())
+ {
+ server1Params.put("port", org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -64,10 +89,8 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
@@ -196,11 +219,18 @@
// to be persisted
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, true, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, true, server1Params);
+ if (isNetty())
+ {
+ server1Params.put("port", org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -208,10 +238,8 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
@@ -387,11 +415,18 @@
public void testTargetServerNotAvailableNoReconnectTries() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, false, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, false, server1Params);
+ if (isNetty())
+ {
+ server1Params.put("port", org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -399,10 +434,8 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
@@ -516,11 +549,18 @@
public void testManualStopStart() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, false, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, false, server1Params);
+ if (isNetty())
+ {
+ server1Params.put("port", org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -528,10 +568,8 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
@@ -616,7 +654,7 @@
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
log.info("stopping bridge manually");
-
+
bridge.stop();
for (int i = numMessages; i < numMessages * 2; i++)
@@ -633,10 +671,10 @@
bridge.start();
log.info("started bridge");
-
- //The previous messages will get resent, but with duplicate detection they will be rejected
- //at the target
+ // The previous messages will get resent, but with duplicate detection they will be rejected
+ // at the target
+
for (int i = numMessages; i < numMessages * 2; i++)
{
ClientMessage message = consumer1.receive(1000);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -32,6 +32,7 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -48,7 +49,25 @@
public class BridgeTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(BridgeTest.class);
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+ private String getConnector()
+ {
+ if (isNetty())
+ {
+ return NettyConnectorFactory.class.getName();
+ }
+ else
+ {
+ return "org.hornetq.core.remoting.impl.invm.InVMConnectorFactory";
+ }
+ }
+
+
public void testSimpleBridge() throws Exception
{
internaltestSimpleBridge(false, false);
@@ -77,11 +96,11 @@
try
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(0, useFiles, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- server1 = createClusteredServerWithParams(1, useFiles, server1Params);
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -89,10 +108,10 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
connectors.put(server1tc.getName(), server1tc);
@@ -219,6 +238,21 @@
}
/**
+ * @param server1Params
+ */
+ private void addTargetParameters(Map<String, Object> server1Params)
+ {
+ if (isNetty())
+ {
+ server1Params.put("port", org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ }
+
+ /**
* @param message
*/
private void readMessages(ClientMessage message)
@@ -260,11 +294,11 @@
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(0, useFiles, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- server1 = createClusteredServerWithParams(1, useFiles, server1Params);
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -272,9 +306,9 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
connectors.put(server1tc.getName(), server1tc);
@@ -430,11 +464,11 @@
public void internaltestWithTransformer(boolean useFiles) throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, false, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, false, server1Params);
+ addTargetParameters(server1Params);
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -442,9 +476,9 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
connectors.put(server1tc.getName(), server1tc);
@@ -557,11 +591,11 @@
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- server1 = createClusteredServerWithParams(1, true, server1Params);
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
@@ -569,10 +603,10 @@
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
- TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
connectors.put(server1tc.getName(), server1tc);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -21,6 +21,8 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -34,12 +36,15 @@
*/
public abstract class BridgeTestBase extends UnitTestCase
{
- protected HornetQServer createHornetQServer(final int id, final Map<String, Object> params)
+ protected HornetQServer createHornetQServer(final int id, final boolean netty, final Map<String, Object> params)
{
- return createHornetQServer(id, params, false);
+ return createHornetQServer(id, params, netty, false);
}
- protected HornetQServer createHornetQServer(final int id, final Map<String, Object> params, final boolean backup)
+ protected HornetQServer createHornetQServer(final int id,
+ final Map<String, Object> params,
+ final boolean netty,
+ final boolean backup)
{
Configuration serviceConf = new ConfigurationImpl();
serviceConf.setClustered(true);
@@ -51,10 +56,20 @@
serviceConf.setJournalDirectory(getJournalDir(id, false));
serviceConf.setPagingDirectory(getPageDir(id, false));
serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(id, false));
-
- params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
- serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+
+ if (netty)
+ {
+ params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME, org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + id);
+ serviceConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
+
+ }
+ else
+ {
+ params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, id);
+ serviceConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+ }
HornetQServer service = HornetQ.newHornetQServer(serviceConf, true);
return service;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -34,6 +34,9 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -46,18 +49,31 @@
*/
public class BridgeWithDiscoveryGroupStartTest extends ServiceTestBase
{
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
public void testStartStop() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createClusteredServerWithParams(0, true, server0Params);
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(SERVER_ID_PROP_NAME, 1);
- HornetQServer server1 = createClusteredServerWithParams(1, true, server1Params);
+ if (isNetty())
+ {
+ server1Params.put("port", TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ }
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
@@ -201,4 +217,19 @@
server1.stop();
}
+
+ /**
+ * @return
+ */
+ private String getConnector()
+ {
+ if (isNetty())
+ {
+ return NettyConnectorFactory.class.getName();
+ }
+ else
+ {
+ return InVMConnectorFactory.class.getName();
+ }
+ }
}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.bridge;
+
+/**
+ * A NettyBridgeReconnectTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyBridgeReconnectTest extends BridgeReconnectTest
+{
+
+ // Constants -----------------------------------------------------
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeStartTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeStartTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.bridge;
+
+/**
+ * A NettyBridgeStartTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyBridgeStartTest extends BridgeStartTest
+{
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.bridge;
+
+/**
+ * A NettyBridgeTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyBridgeTest extends BridgeTest
+{
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeWithDiscoveryGroupStartTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/NettyBridgeWithDiscoveryGroupStartTest.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.bridge;
+
+/**
+ * A NettyBridgeWithDiscoveryGroupStartTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyBridgeWithDiscoveryGroupStartTest extends BridgeWithDiscoveryGroupStartTest
+{
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-13 20:52:21 UTC (rev 8283)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-14 05:58:59 UTC (rev 8284)
@@ -71,12 +71,12 @@
{
super();
}
-
+
public ServiceTestBase(String name)
{
super(name);
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
@@ -146,7 +146,6 @@
return createServer(realFiles, false);
}
-
protected HornetQServer createServer(final boolean realFiles, final boolean netty)
{
return createServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
@@ -189,28 +188,52 @@
return server;
}
- protected HornetQServer createClusteredServerWithParams(final int index,
+ protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
+ final int index,
final boolean realFiles,
final Map<String, Object> params)
{
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
- -1,
- -1,
- new HashMap<String, AddressSettings>());
+ if (isNetty)
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params, NETTY_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
+ else
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
}
-
- protected HornetQServer createClusteredServerWithParams(final int index,
+
+ protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
+ final int index,
final boolean realFiles,
final int pageSize,
final int maxAddressSize,
final Map<String, Object> params)
{
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
- pageSize,
- maxAddressSize,
- new HashMap<String, AddressSettings>());
+ if (isNetty)
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params, NETTY_ACCEPTOR_FACTORY),
+ pageSize,
+ maxAddressSize,
+ new HashMap<String, AddressSettings>());
+ }
+ else
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
}
protected Configuration createDefaultConfig()
@@ -279,7 +302,7 @@
configuration.setLargeMessagesDirectory(getLargeMessagesDir());
configuration.setJournalCompactMinFiles(0);
configuration.setJournalCompactPercentage(0);
-
+
configuration.setFileDeploymentEnabled(false);
configuration.setJournalType(JournalType.ASYNCIO);
@@ -306,7 +329,7 @@
return createInVMFactory();
}
}
-
+
protected ClientSessionFactory createInVMFactory()
{
return createFactory(INVM_CONNECTOR_FACTORY);
@@ -355,7 +378,7 @@
message.getBody().writeBytes(b);
return message;
}
-
+
/**
* Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
*/
@@ -387,7 +410,6 @@
validateNoFilesOnLargeDir(0);
}
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
15 years, 1 month
JBoss hornetq SVN: r8283 - in trunk: examples/javaee/ejb-jms-transaction/server and 30 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-13 15:52:21 -0500 (Fri, 13 Nov 2009)
New Revision: 8283
Removed:
trunk/src/main/org/hornetq/core/server/Distributor.java
trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/queue-attributes.xml
trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
trunk/examples/javaee/hajndi/config/hornetq-queues.xml
trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
Log:
refactored queue delivery logic
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -775,15 +775,8 @@
<entry>boolean</entry>
<entry>whether to treat the queue as a last value queue</entry>
<entry>false</entry>
- </row>
+ </row>
<row>
- <entry><link linkend="queue-attributes.address-settings"
- >address-settings.distribution-policy-class</link></entry>
- <entry>String</entry>
- <entry>the class to use for distributing messages to a consumer</entry>
- <entry>RoundRobinDistributor</entry>
- </row>
- <row>
<entry><link linkend="paging"
>address-settings.page-size-bytes</link></entry>
<entry>Long</entry>
Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/queue-attributes.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -95,8 +95,7 @@
<max-delivery-attempts>3</max-delivery-attempts>
<redelivery-delay>5000</redelivery-delay>
<expiry-address>jms.queue.expiryQueue</expiry-address>
- <last-value-queue>true</last-value-queue>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <last-value-queue>true</last-value-queue>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>20000</page-size-bytes>
<redistribution-delay>0</redistribution-delay>
@@ -123,9 +122,6 @@
see <link linkend="message-expiry.configuring">here</link>.</para>
<para><literal>last-value-queue</literal> defines whether a queue only uses last values or
not. see <link linkend="last-value-queues">here</link>.</para>
- <para><literal>distribution-policy-class</literal> defines the class to use for distribution
- of messages by a queue to consumers. By default this is <literal
- >org.hornetq.core.server.impl.RoundRobinDistributor</literal>.</para>
<para><literal>max-size-bytes</literal> and <literal>page-size-bytes</literal> are used to
set paging on an address. This is explained <link linkend="paging">here</link>.</para>
<para><literal>redistribution-delay</literal> defines how long to wait when the last
Modified: trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -51,8 +51,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/hajndi/config/hornetq-queues.xml
===================================================================
--- trunk/examples/javaee/hajndi/config/hornetq-queues.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/hajndi/config/hornetq-queues.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -17,8 +17,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -54,8 +54,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -51,8 +51,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-13 20:52:21 UTC (rev 8283)
@@ -454,9 +454,7 @@
<xsd:element maxOccurs="1" minOccurs="0" name="page-size-bytes" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="address-full-policy" type="addressFullMessagePolicyType">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="distribution-policy-class" type="xsd:string">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="message-counter-history-day-limit" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="last-value-queue" type="xsd:boolean">
Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -82,8 +82,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -56,8 +56,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -61,8 +61,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -35,8 +35,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -61,8 +61,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -35,8 +35,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -30,7 +30,7 @@
public class AddressSettingsDeployer extends XmlDeployer
{
private static final Logger log = Logger.getLogger(AddressSettingsDeployer.class);
-
+
private static final String DEAD_LETTER_ADDRESS_NODE_NAME = "dead-letter-address";
private static final String EXPIRY_ADDRESS_NODE_NAME = "expiry-address";
@@ -45,14 +45,12 @@
private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
- private static final String DISTRIBUTION_POLICY_CLASS_NODE_NAME = "distribution-policy-class";
-
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
private static final String LVQ_NODE_NAME = "last-value-queue";
-
+
private static final String REDISTRIBUTION_DELAY_NODE_NAME = "redistribution-delay";
-
+
private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -85,7 +83,7 @@
* @throws Exception .
*/
public void deploy(Node node) throws Exception
- {
+ {
String match = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
NodeList children = node.getChildNodes();
@@ -118,10 +116,6 @@
{
addressSettings.setPageSizeBytes(Integer.valueOf(child.getTextContent()));
}
- else if (DISTRIBUTION_POLICY_CLASS_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
- {
- addressSettings.setDistributionPolicyClass(child.getTextContent());
- }
else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
addressSettings.setMessageCounterHistoryDayLimit(Integer.valueOf(child.getTextContent()));
@@ -142,7 +136,7 @@
else if (value.equals(AddressFullMessagePolicy.PAGE.toString()))
{
policy = AddressFullMessagePolicy.PAGE;
- }
+ }
addressSettings.setAddressFullMessagePolicy(policy);
}
else if (LVQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
@@ -154,11 +148,11 @@
addressSettings.setMaxDeliveryAttempts(Integer.valueOf(child.getTextContent().trim()));
}
else if (REDISTRIBUTION_DELAY_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
- {
+ {
addressSettings.setRedistributionDelay(Long.valueOf(child.getTextContent().trim()));
}
else if (SEND_TO_DLA_ON_NO_ROUTE.equalsIgnoreCase(child.getNodeName()))
- {
+ {
addressSettings.setSendToDLAOnNoRoute(Boolean.valueOf(child.getTextContent().trim()));
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -14,7 +14,7 @@
package org.hornetq.core.postoffice.impl;
-import java.util.Set;
+import java.util.Collection;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
@@ -118,31 +118,7 @@
{
//It's a high accept priority if the queue has at least one matching consumer
- Set<Consumer> consumers = queue.getConsumers();
-
- for (Consumer consumer: consumers)
- {
- if (consumer instanceof Redistributor)
- {
- continue;
- }
-
- Filter filter = consumer.getFilter();
-
- if (filter == null)
- {
- return true;
- }
- else
- {
- if (filter.match(message))
- {
- return true;
- }
- }
- }
-
- return false;
+ return queue.hasMatchingConsumer(message);
}
public void route(final ServerMessage message, final RoutingContext context) throws Exception
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -420,6 +420,7 @@
public void closeContext()
{
final ReplicationContext token = tlReplicationContext.get();
+
if (token != null)
{
// Disassociate thread local
Deleted: trunk/src/main/org/hornetq/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Distributor.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/Distributor.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server;
-
-
-/**
- *
- * A Distributor
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public interface Distributor
-{
- void addConsumer(Consumer consumer);
-
- boolean removeConsumer(Consumer consumer);
-
- int getConsumerCount();
-
- Consumer getNextConsumer();
-}
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -13,9 +13,9 @@
package org.hornetq.core.server;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
@@ -49,8 +49,6 @@
int getConsumerCount();
- Set<Consumer> getConsumers();
-
void addLast(MessageReference ref);
void addFirst(MessageReference ref);
@@ -79,10 +77,10 @@
List<MessageReference> getScheduledMessages();
- Distributor getDistributionPolicy();
+// Distributor getDistributionPolicy();
+//
+// void setDistributionPolicy(Distributor policy);
- void setDistributionPolicy(Distributor policy);
-
int getMessagesAdded();
MessageReference removeReferenceWithID(long id) throws Exception;
@@ -119,10 +117,14 @@
void addRedistributor(long delay, Executor executor);
void cancelRedistributor() throws Exception;
+
+ boolean hasMatchingConsumer(ServerMessage message);
// Only used in testing
void deliverNow();
-
+
+ Collection<Consumer> getConsumers();
+
boolean checkDLQ(MessageReference ref) throws Exception;
void lockDelivery();
Deleted: trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public abstract class DistributorImpl implements Distributor
-{
- private static final Logger log = Logger.getLogger(DistributorImpl.class);
-
- protected final List<Consumer> consumers = new ArrayList<Consumer>();
-
- public void addConsumer(final Consumer consumer)
- {
- consumers.add(consumer);
- }
-
- public boolean removeConsumer(final Consumer consumer)
- {
- return consumers.remove(consumer);
- }
-
- public int getConsumerCount()
- {
- return consumers.size();
- }
-
- public List<Consumer> getConsumers()
- {
- return consumers;
- }
-}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -96,8 +96,6 @@
addressSettingsRepository);
}
- queue.setDistributionPolicy(addressSettings.getDistributionPolicy());
-
return queue;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -40,7 +40,6 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -88,14 +87,12 @@
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
- private final MessageHandler globalHandler = new NullFilterMessageHandler();
+ private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
- private volatile Distributor distributionPolicy = new RoundRobinDistributor();
-
private boolean direct;
private boolean promptDelivery;
@@ -128,14 +125,14 @@
// We cache the consumers here since we don't want to include the redistributor
- private final Set<Consumer> consumers = new HashSet<Consumer>();
+ private final Set<Consumer> consumerSet = new HashSet<Consumer>();
- private final Map<Consumer, MessageHandler> messageHandlers = new HashMap<Consumer, MessageHandler>();
-
private final ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
private volatile SimpleString expiryAddress;
+ private int pos;
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -276,29 +273,33 @@
{
cancelRedistributor();
- distributionPolicy.addConsumer(consumer);
+ MessageHandler handler;
- consumers.add(consumer);
-
if (consumer.getFilter() != null)
{
- messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
+ handler = new FilterMessageHandler(consumer, messageReferences.iterator());
}
+ else
+ {
+ handler = new NullFilterMessageHandler(consumer);
+ }
+
+ handlers.add(handler);
+
+ consumerSet.add(consumer);
}
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
- boolean removed = distributionPolicy.removeConsumer(consumer);
+ boolean removed = this.removeHandlerGivenConsumer(consumer);
- if (distributionPolicy.getConsumerCount() == 0)
+ if (handlers.isEmpty())
{
promptDelivery = false;
}
- consumers.remove(consumer);
+ consumerSet.remove(consumer);
- messageHandlers.remove(consumer);
-
if (removed)
{
for (SimpleString groupID : groups.keySet())
@@ -330,7 +331,7 @@
if (delay > 0)
{
- if (consumers.size() == 0)
+ if (consumerSet.isEmpty())
{
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
@@ -353,7 +354,7 @@
redistributor = null;
- distributionPolicy.removeConsumer(redistributor);
+ removeHandlerGivenConsumer(redistributor);
}
if (future != null)
@@ -366,14 +367,42 @@
public synchronized int getConsumerCount()
{
- return consumers.size();
+ return consumerSet.size();
}
public synchronized Set<Consumer> getConsumers()
{
- return consumers;
+ return consumerSet;
}
+ public synchronized boolean hasMatchingConsumer(final ServerMessage message)
+ {
+ for (MessageHandler handler : handlers)
+ {
+ Consumer consumer = handler.getConsumer();
+
+ if (consumer instanceof Redistributor)
+ {
+ continue;
+ }
+
+ Filter filter = consumer.getFilter();
+
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ if (filter.match(message))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public Iterator<MessageReference> iterator()
{
return new Iterator<MessageReference>()
@@ -604,16 +633,6 @@
deliveringCount.incrementAndGet();
}
- public Distributor getDistributionPolicy()
- {
- return distributionPolicy;
- }
-
- public void setDistributionPolicy(final Distributor distributionPolicy)
- {
- this.distributionPolicy = distributionPolicy;
- }
-
public int getMessagesAdded()
{
return messagesAdded.get();
@@ -862,14 +881,37 @@
// Private
// ------------------------------------------------------------------------------
+ private boolean removeHandlerGivenConsumer(final Consumer consumer)
+ {
+ Iterator<MessageHandler> iter = handlers.iterator();
+
+ boolean removed = false;
+
+ while (iter.hasNext())
+ {
+ MessageHandler handler = iter.next();
+
+ if (handler.getConsumer() == consumer)
+ {
+ iter.remove();
+
+ removed = true;
+
+ break;
+ }
+ }
+
+ return removed;
+ }
+
private void internalAddRedistributor(final Executor executor)
{
// create the redistributor only once if there are no local consumers
- if (consumers.size() == 0 && redistributor == null)
+ if (consumerSet.isEmpty() && redistributor == null)
{
redistributor = new Redistributor(this, storageManager, postOffice, executor, REDISTRIBUTOR_BATCH_SIZE);
- distributionPolicy.addConsumer(redistributor);
+ handlers.add(new NullFilterMessageHandler(redistributor));
redistributor.start();
@@ -979,6 +1021,7 @@
private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
{
SimpleString deadLetterAddress = addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress();
+
if (deadLetterAddress != null)
{
Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
@@ -1021,122 +1064,222 @@
tx.commit();
}
+ private MessageHandler getHandlerRoundRobin()
+ {
+ MessageHandler handler = handlers.get(pos);
+
+ pos++;
+
+ if (pos == handlers.size())
+ {
+ pos = 0;
+ }
+
+ return handler;
+ }
+
+ private boolean checkExpired(final MessageReference reference)
+ {
+ if (reference.getMessage().isExpired())
+ {
+ reference.handled();
+
+ try
+ {
+ expire(reference);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to expire ref", e);
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
/*
* Attempt to deliver all the messages in the queue
*/
private synchronized void deliver()
{
- if (paused)
+ if (paused || handlers.isEmpty())
{
return;
}
direct = false;
- if (distributionPolicy.getConsumerCount() == 0)
- {
- return;
- }
-
- Consumer consumer;
-
- MessageReference reference;
-
- // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
- int totalConsumers = distributionPolicy.getConsumerCount();
- Set<Consumer> busyConsumers = new HashSet<Consumer>();
- Set<Consumer> nullReferences = new HashSet<Consumer>();
-
+ int startPos = pos;
+ int totalCount = handlers.size();
+ int nullCount = 0;
+ int busyCount = 0;
while (true)
{
- consumer = distributionPolicy.getNextConsumer();
+ MessageHandler handler = getHandlerRoundRobin();
- MessageHandler handler = messageHandlers.get(consumer);
+ Consumer consumer = handler.getConsumer();
- if (handler == null)
- {
- handler = globalHandler;
- }
+ MessageReference reference = handler.peek(consumer);
- reference = handler.peek(consumer);
-
if (reference == null)
{
- nullReferences.add(consumer);
-
- if (nullReferences.size() + busyConsumers.size() == totalConsumers)
- {
- // We delivered all the messages - go into direct delivery
- direct = true;
-
- promptDelivery = false;
-
- return;
- }
-
- continue;
+ nullCount++;
}
else
{
- nullReferences.remove(consumer);
-
- if (reference.getMessage().isExpired())
+ if (checkExpired(reference))
{
- // We expire messages on the server too
handler.remove();
+ }
+ else
+ {
+ final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
- reference.handled();
+ boolean tryHandle = true;
- try
+ if (groupID != null)
{
- expire(reference);
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+
+ busyCount++;
+ }
}
- catch (Exception e)
+
+ if (tryHandle)
{
- log.error("Failed to expire ref", e);
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ handler.remove();
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
+
+ handler.reset();
+
+ // if (groupID != null )
+ // {
+ // // group id being set seems to make delivery stop
+ // // FIXME !!! why??
+ // break;
+ // }
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ // if consumer filter reject the message make sure it won't be assigned the message group
+ if (groupID != null)
+ {
+ groups.remove(groupID);
+ }
+ }
}
-
- continue;
}
}
- final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
- if (groupID != null)
+ if (pos == startPos)
{
- Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+ // We've done all the consumers
- if (groupConsumer != null && groupConsumer != consumer)
+ if (nullCount + busyCount == totalCount)
{
- continue;
+ if (nullCount == totalCount)
+ {
+ // We delivered all the messages - go into direct delivery
+ direct = true;
+
+ promptDelivery = false;
+ }
+
+ break;
}
+
+ nullCount = busyCount = 0;
}
+ }
+ }
- HandleStatus status = handle(reference, consumer);
+ private synchronized boolean directDeliver(final MessageReference reference)
+ {
+ if (paused || handlers.isEmpty())
+ {
+ return false;
+ }
- if (status == HandleStatus.HANDLED)
+ int startPos = pos;
+ int busyCount = 0;
+ boolean setPromptDelivery = false;
+ while (true)
+ {
+ MessageHandler handler = getHandlerRoundRobin();
+
+ Consumer consumer = handler.getConsumer();
+
+ if (!checkExpired(reference))
{
- handler.remove();
- }
- else if (status == HandleStatus.BUSY)
- {
- busyConsumers.add(consumer);
+ SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
- handler.reset();
+ boolean tryHandle = true;
- if (groupID != null || busyConsumers.size() == totalConsumers)
+ if (groupID != null)
{
- // when all consumers are busy, we stop
- break;
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+ }
}
+
+ if (tryHandle)
+ {
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ return true;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
+
+ if (groupID != null)
+ {
+ // If the group has been assigned a consumer there is no point in trying others
+
+ return false;
+ }
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ // if consumer filter reject the message make sure it won't be assigned the message group
+ if (groupID != null)
+ {
+ groups.remove(groupID);
+ }
+
+ setPromptDelivery = true;
+ }
+ }
}
- else if (status == HandleStatus.NO_MATCH)
+
+ if (pos == startPos)
{
- // if consumer filter reject the message make sure it won't be assigned the message group
- if (groupID != null)
+ if (setPromptDelivery)
{
- groups.remove(consumer);
+ promptDelivery = true;
}
+
+ return false;
}
}
}
@@ -1159,23 +1302,12 @@
{
// Deliver directly
- HandleStatus status = directDeliver(ref);
+ boolean delivered = directDeliver(ref);
- if (status == HandleStatus.HANDLED)
+ if (!delivered)
{
- // Ok
- }
- else if (status == HandleStatus.BUSY)
- {
add = true;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- add = true;
- }
- if (add)
- {
direct = false;
}
}
@@ -1213,81 +1345,6 @@
}
}
- private synchronized HandleStatus directDeliver(final MessageReference reference)
- {
- if (distributionPolicy.getConsumerCount() == 0)
- {
- return HandleStatus.BUSY;
- }
-
- HandleStatus status;
-
- boolean filterRejected = false;
-
- int consumerCount = 0;
-
- while (true)
- {
- Consumer consumer = distributionPolicy.getNextConsumer();
- consumerCount++;
-
- final SimpleString groupId = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
- if (groupId != null)
- {
- Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
- if (groupConsumer != null && groupConsumer != consumer)
- {
- continue;
- }
- }
-
- status = handle(reference, consumer);
-
- if (status == HandleStatus.HANDLED)
- {
- break;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- filterRejected = true;
- if (groupId != null)
- {
- groups.remove(consumer);
- }
- }
- else if (status == HandleStatus.BUSY)
- {
- if (groupId != null)
- {
- break;
- }
- }
- // if we've tried all of them
- if (consumerCount == distributionPolicy.getConsumerCount())
- {
- if (filterRejected)
- {
- status = HandleStatus.NO_MATCH;
- break;
- }
- else
- {
- // Give up - all consumers busy
- status = HandleStatus.BUSY;
- break;
- }
- }
- }
-
- if (status == HandleStatus.NO_MATCH)
- {
- promptDelivery = true;
- }
-
- return status;
- }
-
private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
{
HandleStatus status;
@@ -1392,7 +1449,6 @@
{
public void run()
{
-
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
@@ -1408,7 +1464,7 @@
}
}
- final class RefsOperation implements TransactionOperation
+ private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
@@ -1523,21 +1579,38 @@
void remove();
void reset();
+
+ Consumer getConsumer();
}
private class FilterMessageHandler implements MessageHandler
{
+ private final Consumer consumer;
+
private Iterator<MessageReference> iterator;
- public FilterMessageHandler(final Iterator<MessageReference> iterator)
+ private MessageReference lastReference;
+
+ private boolean resetting;
+
+ public FilterMessageHandler(final Consumer consumer, final Iterator<MessageReference> iterator)
{
+ this.consumer = consumer;
+
this.iterator = iterator;
}
public MessageReference peek(final Consumer consumer)
{
+ if (resetting)
+ {
+ resetting = false;
+
+ return lastReference;
+ }
+
MessageReference reference;
-
+
if (iterator.hasNext())
{
reference = iterator.next();
@@ -1554,6 +1627,8 @@
iterator = messageReferences.iterator();
}
}
+ lastReference = reference;
+
return reference;
}
@@ -1564,12 +1639,24 @@
public void reset()
{
- iterator = messageReferences.iterator();
+ resetting = true;
}
+
+ public Consumer getConsumer()
+ {
+ return consumer;
+ }
}
private class NullFilterMessageHandler implements MessageHandler
{
+ private final Consumer consumer;
+
+ NullFilterMessageHandler(final Consumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
public MessageReference peek(final Consumer consumer)
{
return messageReferences.peekFirst();
@@ -1584,5 +1671,10 @@
{
// no-op
}
+
+ public Consumer getConsumer()
+ {
+ return consumer;
+ }
}
}
Deleted: trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-
-/**
- * A RoundRobinDistributor
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-public class RoundRobinDistributor extends DistributorImpl
-{
- private static final Logger log = Logger.getLogger(RoundRobinDistributor.class);
-
- protected int pos = 0;
-
- @Override
- public synchronized void addConsumer(final Consumer consumer)
- {
- pos = 0;
- super.addConsumer(consumer);
- }
-
- @Override
- public synchronized boolean removeConsumer(final Consumer consumer)
- {
- pos = 0;
- return super.removeConsumer(consumer);
- }
-
- @Override
- public synchronized int getConsumerCount()
- {
- return super.getConsumerCount();
- }
-
- public synchronized Consumer getNextConsumer()
- {
- Consumer consumer = consumers.get(pos);
- incrementPosition();
- return consumer;
- }
-
- private synchronized void incrementPosition()
- {
- pos++;
-
- if (pos == consumers.size())
- {
- pos = 0;
- }
- }
-}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -1728,6 +1728,7 @@
}
});
+
storageManager.completeReplication();
}
else
@@ -1750,6 +1751,7 @@
if (confirmPacket != null)
{
channel.confirm(confirmPacket);
+
if (flush)
{
channel.flushConfirmations();
Modified: trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -14,8 +14,6 @@
package org.hornetq.core.settings.impl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Distributor;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
import org.hornetq.core.settings.Mergeable;
import org.hornetq.utils.SimpleString;
@@ -32,14 +30,12 @@
/**
* defaults used if null, this allows merging
*/
- public static final Class<?> DEFAULT_DISTRIBUTION_POLICY_CLASS = new RoundRobinDistributor().getClass();
-
public static final int DEFAULT_MAX_SIZE_BYTES = -1;
public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY = AddressFullMessagePolicy.PAGE;
public static final int DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
-
+
public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
public static final int DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
@@ -60,8 +56,6 @@
private Boolean dropMessagesWhenFull = null;
- private String distributionPolicyClass = null;
-
private Integer maxDeliveryAttempts = null;
private Integer messageCounterHistoryDayLimit = null;
@@ -149,16 +143,6 @@
this.redeliveryDelay = redeliveryDelay;
}
- public String getDistributionPolicyClass()
- {
- return distributionPolicyClass;
- }
-
- public void setDistributionPolicyClass(final String distributionPolicyClass)
- {
- this.distributionPolicyClass = distributionPolicyClass;
- }
-
public SimpleString getDeadLetterAddress()
{
return deadLetterAddress;
@@ -189,25 +173,6 @@
sendToDLAOnNoRoute = value;
}
- public Distributor getDistributionPolicy()
- {
- try
- {
- if (distributionPolicyClass != null)
- {
- return (Distributor)getClass().getClassLoader().loadClass(distributionPolicyClass).newInstance();
- }
- else
- {
- return (Distributor)DEFAULT_DISTRIBUTION_POLICY_CLASS.newInstance();
- }
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating distribution policy '" + e + " '");
- }
- }
-
public long getRedistributionDelay()
{
return redistributionDelay != null ? redistributionDelay : DEFAULT_REDISTRIBUTION_DELAY;
@@ -248,10 +213,6 @@
{
redeliveryDelay = merged.redeliveryDelay;
}
- if (distributionPolicyClass == null)
- {
- distributionPolicyClass = merged.distributionPolicyClass;
- }
if (deadLetterAddress == null)
{
deadLetterAddress = merged.deadLetterAddress;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -12,7 +12,7 @@
*/
package org.hornetq.tests.integration.client;
-import java.util.Set;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -911,7 +911,7 @@
for (Binding binding : bindings.getBindings())
{
- Set<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
+ Collection<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
for (Consumer consumer : consumers)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -29,6 +29,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -41,6 +42,8 @@
*/
public class MessageGroupingTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(MessageGroupingTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -85,13 +88,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -130,13 +133,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -178,18 +181,18 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
- for(int i = 0; i < numMessages/2; i++)
+ for (int i = 0; i < numMessages / 2; i++)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
@@ -199,18 +202,25 @@
assertNotNull(cm);
assertEquals(cm.getBody().readString(), "m" + i);
}
-
+
+ log.info("closing consumers");
+
consumer2.close();
+
+ log.info("closed consumer 2");
+
consumer.close();
- //check that within their groups the messages are still in the correct order
+
+ log.info("closed consuemrs");
+ // check that within their groups the messages are still in the correct order
consumer = clientSession.createConsumer(qName);
- for(int i = 0; i < numMessages; i+=2)
+ for (int i = 0; i < numMessages; i += 2)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
assertEquals(cm.getBody().readString(), "m" + i);
}
- for(int i = 1; i < numMessages; i+=2)
+ for (int i = 1; i < numMessages; i += 2)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
@@ -230,13 +240,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -269,13 +279,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -322,13 +332,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -393,13 +403,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -451,13 +461,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -536,14 +546,14 @@
}
server = null;
clientSession = null;
-
+
super.tearDown();
}
protected void setUp() throws Exception
{
super.setUp();
-
+
ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
@@ -578,13 +588,13 @@
if (acknowledge)
{
try
- {
- message.acknowledge();
- }
- catch (HornetQException e)
{
- //ignore
+ message.acknowledge();
}
+ catch (HornetQException e)
+ {
+ // ignore
+ }
}
latch.countDown();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -149,7 +149,7 @@
session.close();
}
-
+
public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -182,6 +182,8 @@
boolean failed;
boolean started = true;
+
+ int count = 0;
public void onMessage(final ClientMessage message)
{
@@ -192,16 +194,17 @@
{
failed = true;
}
-
- latch.countDown();
-
- if (latch.getCount() == 0)
+
+ count++;
+
+ if (count == 10)
{
-
message.acknowledge();
session.stop();
started = false;
}
+
+ latch.countDown();
}
catch (Exception e)
{
@@ -215,8 +218,6 @@
latch.await();
- Thread.sleep(100);
-
assertFalse(handler.failed);
// Make sure no exceptions were thrown from onMessage
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -281,7 +281,7 @@
csf.close();
}
-
+
/*
* Test the client triggering failure due to no ping from server received in time
*/
@@ -325,7 +325,7 @@
//Setting the handler to null will prevent server sending pings back to client
serverConn.getChannel(0, -1).setHandler(null);
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 2000; i++)
{
// a few tries to avoid a possible race caused by GCs or similar issues
if (server.getRemotingService().getConnections().isEmpty() && clientListener.getException() != null)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -33,8 +33,7 @@
" <dead-letter-address>DLQtest</dead-letter-address>\n" +
" <expiry-address>ExpiryQueueTest</expiry-address>\n" +
" <redelivery-delay>100</redelivery-delay>\n" +
- " <max-size-bytes>-100</max-size-bytes>\n" +
- " <distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>\n" +
+ " <max-size-bytes>-100</max-size-bytes>\n" +
" <message-counter-history-day-limit>1000</message-counter-history-day-limit>\n" +
" </address-settings>";
@@ -57,8 +56,7 @@
AddressSettings as = repository.getMatch("queues.aq");
assertNotNull(as);
assertEquals(100, as.getRedeliveryDelay());
- assertEquals(-100, as.getMaxSizeBytes());
- assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy", as.getDistributionPolicyClass());
+ assertEquals(-100, as.getMaxSizeBytes());
assertEquals(1000, as.getMessageCounterHistoryDayLimit());
assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
assertEquals(new SimpleString("ExpiryQueueTest"), as.getExpiryAddress());
@@ -72,8 +70,7 @@
+ " <dead-letter-address>DLQtest</dead-letter-address>\n"
+ " <expiry-address>ExpiryQueueTest</expiry-address>\n"
+ " <redelivery-delay>100</redelivery-delay>\n"
- + " <max-size-bytes>-100</max-size-bytes>\n"
- + " <distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>"
+ + " <max-size-bytes>-100</max-size-bytes>\n"
+ " <message-counter-history-day-limit>1000</message-counter-history-day-limit>"
+ " </address-setting>"
+ "</address-settings>"
@@ -88,8 +85,7 @@
AddressSettings as = repository.getMatch("queues.aq");
assertNotNull(as);
assertEquals(100, as.getRedeliveryDelay());
- assertEquals(-100, as.getMaxSizeBytes());
- assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy", as.getDistributionPolicyClass());
+ assertEquals(-100, as.getMaxSizeBytes());
assertEquals(1000, as.getMessageCounterHistoryDayLimit());
assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
assertEquals(new SimpleString("ExpiryQueueTest"), as.getExpiryAddress());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -20,7 +20,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
@@ -31,7 +30,7 @@
public class FakeQueue implements Queue
{
private SimpleString name;
-
+
public FakeQueue(SimpleString name)
{
this.name = name;
@@ -43,7 +42,7 @@
public void acknowledge(MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -52,7 +51,7 @@
public void acknowledge(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -61,7 +60,7 @@
public void addConsumer(Consumer consumer) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -70,7 +69,7 @@
public void addFirst(MessageReference ref)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -79,7 +78,7 @@
public void addLast(MessageReference ref)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -88,7 +87,7 @@
public void addRedistributor(long delay, Executor executor)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -97,7 +96,7 @@
public void cancel(MessageReference reference) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -106,7 +105,7 @@
public void cancel(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -115,7 +114,7 @@
public void cancelRedistributor() throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -169,7 +168,7 @@
public void deliverAsync(Executor executor)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -178,7 +177,7 @@
public void deliverNow()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -187,7 +186,7 @@
public void expire(MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -205,7 +204,7 @@
public void expireReferences() throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -245,15 +244,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#getDistributionPolicy()
- */
- public Distributor getDistributionPolicy()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getFilter()
*/
public Filter getFilter()
@@ -375,7 +365,7 @@
public void lockDelivery()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -402,7 +392,7 @@
public void pause()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -411,7 +401,7 @@
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -420,7 +410,7 @@
public void referenceHandled()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -456,7 +446,7 @@
public void resume()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -469,32 +459,23 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
- */
- public void setDistributionPolicy(Distributor policy)
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
*/
public void setExpiryAddress(SimpleString expiryAddress)
{
// TODO Auto-generated method stub
-
+
}
- // TODO Auto-generated method stub
-
+ // TODO Auto-generated method stub
+
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#unlockDelivery()
*/
public void unlockDelivery()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -503,9 +484,13 @@
public void route(ServerMessage message, RoutingContext context) throws Exception
{
// TODO Auto-generated method stub
-
+
}
-
-
+ public boolean hasMatchingConsumer(ServerMessage message)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -23,13 +23,11 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.QueueImpl;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -118,21 +116,6 @@
assertFalse(queue.removeConsumer(cons3));
}
- public void testGetSetDistributionPolicy()
- {
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-
- assertNotNull(queue.getDistributionPolicy());
-
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
- Distributor policy = new DummyDistributionPolicy();
-
- queue.setDistributionPolicy(policy);
-
- assertEquals(policy, queue.getDistributionPolicy());
- }
-
public void testGetFilter()
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -608,8 +591,6 @@
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -653,8 +634,6 @@
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -943,7 +922,7 @@
int currId = 0;
for (MessageReference receeivedRef : receeivedRefs)
{
- assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+ assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID(), currId++);
}
}
@@ -1017,7 +996,7 @@
int currId = 10;
for (MessageReference receeivedRef : receeivedRefs)
{
- assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+ assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID(), currId++);
}
}
@@ -1248,7 +1227,7 @@
public void testPauseAndResumeWithAsync() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-
+
// pauses the queue
queue.pause();
@@ -1377,34 +1356,4 @@
}
}
- class DummyDistributionPolicy implements Distributor
- {
- Consumer consumer;
-
- public List<Consumer> getConsumers()
- {
- return null;
- }
-
- public void addConsumer(Consumer consumer)
- {
- this.consumer = consumer;
- }
-
- public boolean removeConsumer(Consumer consumer)
- {
- return false;
- }
-
- public int getConsumerCount()
- {
- return 0;
- }
-
- public Consumer getNextConsumer()
- {
- return consumer;
- }
- }
-
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -9,7 +9,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
- */
+ */
package org.hornetq.tests.unit.core.settings.impl;
@@ -26,14 +26,13 @@
public void testDefaults()
{
AddressSettings addressSettings = new AddressSettings();
- assertEquals(AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS, addressSettings.getDistributionPolicy().getClass());
- assertEquals(null, addressSettings.getDistributionPolicyClass());
assertEquals(null, addressSettings.getDeadLetterAddress());
assertEquals(null, addressSettings.getExpiryAddress());
assertEquals(AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS, addressSettings.getMaxDeliveryAttempts());
assertEquals(addressSettings.getMaxSizeBytes(), AddressSettings.DEFAULT_MAX_SIZE_BYTES);
assertEquals(AddressSettings.DEFAULT_PAGE_SIZE, addressSettings.getPageSizeBytes());
- assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT, addressSettings.getMessageCounterHistoryDayLimit());
+ assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT,
+ addressSettings.getMessageCounterHistoryDayLimit());
assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY, addressSettings.getRedeliveryDelay());
}
@@ -53,8 +52,6 @@
addressSettingsToMerge.setRedeliveryDelay((long)1003);
addressSettingsToMerge.setPageSizeBytes(1004);
addressSettings.merge(addressSettingsToMerge);
- assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -86,8 +83,6 @@
addressSettingsToMerge2.setRedeliveryDelay((long)2003);
addressSettings.merge(addressSettingsToMerge2);
- assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -122,14 +117,12 @@
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.merge(addressSettingsToMerge2);
- assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 2000);
assertEquals(addressSettings.getMaxSizeBytes(), 1001);
assertEquals(addressSettings.getMessageCounterHistoryDayLimit(), 2002);
- assertEquals(addressSettings.getRedeliveryDelay(),1003);
+ assertEquals(addressSettings.getRedeliveryDelay(), 1003);
assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
}
}
15 years, 1 month
JBoss hornetq SVN: r8282 - trunk/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-13 14:14:02 -0500 (Fri, 13 Nov 2009)
New Revision: 8282
Modified:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Adding a test that I had here to the ReplicationTEst
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-13 18:57:52 UTC (rev 8281)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-13 19:14:02 UTC (rev 8282)
@@ -504,6 +504,89 @@
}
}
+ public void testOrderOnNonPersistency() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ FailoverManager failoverManager = createFailoverManager();
+
+ final ArrayList<Integer> executions = new ArrayList<Integer>();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ manager.start();
+
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+
+ int numberOfAdds = 200;
+
+ final CountDownLatch latch = new CountDownLatch(numberOfAdds);
+
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ final int nAdd = i;
+
+ if (i % 2 == 0)
+ {
+ replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
+ }
+ else
+ {
+ manager.sync();
+ }
+
+
+ manager.afterReplicated(new Runnable()
+ {
+
+ public void run()
+ {
+ executions.add(nAdd);
+ latch.countDown();
+ }
+
+ });
+
+ manager.closeContext();
+ }
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ assertEquals(i, executions.get(i).intValue());
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ // This is asynchronous. Have to wait completion
+ if (manager.getActiveTokens().size() == 0)
+ {
+ break;
+ }
+ Thread.sleep(1);
+ }
+
+
+ assertEquals(0, manager.getActiveTokens().size());
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
class FakeData implements EncodingSupport
{
15 years, 1 month