JBoss hornetq SVN: r8135 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-22 04:37:05 -0400 (Thu, 22 Oct 2009)
New Revision: 8135
Modified:
trunk/docs/user-manual/en/intercepting-operations.xml
Log:
client-side interceptors documentation
Modified: trunk/docs/user-manual/en/intercepting-operations.xml
===================================================================
--- trunk/docs/user-manual/en/intercepting-operations.xml 2009-10-22 07:45:44 UTC (rev 8134)
+++ trunk/docs/user-manual/en/intercepting-operations.xml 2009-10-22 08:37:05 UTC (rev 8135)
@@ -21,8 +21,8 @@
<chapter id="intercepting-operations">
<title>Intercepting Operations</title>
<para>HornetQ supports <emphasis>interceptors</emphasis> to intercept packets entering
- and leaving the server. Any supplied interceptors would be called for any packet entering or
- leaving the server, this allows custom code to be executed, e.g. for auditing packets,
+ the server. Any supplied interceptors would be called for any packet entering
+ the server, this allows custom code to be executed, e.g. for auditing packets,
filtering or other reasons. Interceptors can change the packets they intercept.</para>
<section>
<title>Implementing The Interceptors</title>
@@ -61,6 +61,14 @@
to be properly instantiated and called.</para>
</section>
<section>
+ <title>Interceptors on the Client Side</title>
+ <para>The interceptors can also be run on the client side to intercept packets
+ <emphasis>sent by the server</emphasis> by adding the interceptor to the <code>ClientSessionFactory</code>
+ with the <code>addInterceptor()</code> method.</para>
+ <para>The interceptors classes (and their dependencies) must be added to the client classpath
+ to be properly instantiated and called from the client side.</para>
+ </section>
+ <section>
<title>Example</title>
<para>See <xref linkend="examples.interceptor" /> for an example which
shows how to use interceptors to add properties to a message on the server.</para>
14 years, 6 months
JBoss hornetq SVN: r8134 - trunk/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-22 03:45:44 -0400 (Thu, 22 Oct 2009)
New Revision: 8134
Modified:
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-182 - fixing testcase
* added missing method ServiceTesBase.createClusteredServerWithParams(int, boolean, int, int Map)
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-10-22 03:09:56 UTC (rev 8133)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-10-22 07:45:44 UTC (rev 8134)
@@ -193,6 +193,19 @@
-1,
new HashMap<String, AddressSettings>());
}
+
+ protected HornetQServer createClusteredServerWithParams(final int index,
+ final boolean realFiles,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String, Object> params)
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
+ pageSize,
+ maxAddressSize,
+ new HashMap<String, AddressSettings>());
+ }
protected Configuration createDefaultConfig()
{
14 years, 6 months
JBoss hornetq SVN: r8133 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-21 23:09:56 -0400 (Wed, 21 Oct 2009)
New Revision: 8133
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-182 - Adding testcase
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-10-21 09:11:46 UTC (rev 8132)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-10-22 03:09:56 UTC (rev 8133)
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -534,6 +535,145 @@
server1.stop();
}
+
+ // https://jira.jboss.org/jira/browse/HORNETQ-182
+ public void disabled_testBridgeWithPaging() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 100 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ server1 = createClusteredServerWithParams(1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ true,
+ false,
+ connectorPair);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 500;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ }
+ finally
+ {
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
protected void setUp() throws Exception
{
14 years, 6 months
JBoss hornetq SVN: r8132 - trunk/tests/src/org/hornetq/tests/unit/core/message/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-21 05:11:46 -0400 (Wed, 21 Oct 2009)
New Revision: 8132
Added:
trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
Removed:
trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTestBase.java
Log:
Core Message unit tests
* renamed MessageImplTestBase to MessageImplTest and made it concrete
Copied: trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java (from rev 8128, trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTestBase.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-10-21 09:11:46 UTC (rev 8132)
@@ -0,0 +1,343 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.message.impl;
+
+import static org.hornetq.tests.util.RandomUtil.randomBoolean;
+import static org.hornetq.tests.util.RandomUtil.randomByte;
+import static org.hornetq.tests.util.RandomUtil.randomBytes;
+import static org.hornetq.tests.util.RandomUtil.randomDouble;
+import static org.hornetq.tests.util.RandomUtil.randomFloat;
+import static org.hornetq.tests.util.RandomUtil.randomInt;
+import static org.hornetq.tests.util.RandomUtil.randomLong;
+import static org.hornetq.tests.util.RandomUtil.randomShort;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.Set;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class MessageImplTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(MessageImplTest.class);
+
+ public void testEncodeDecode()
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ byte[] bytes = new byte[1000];
+ for (int i = 0; i < bytes.length; i++)
+ {
+ bytes[i] = randomByte();
+ }
+ HornetQBuffer body = ChannelBuffers.wrappedBuffer(bytes);
+ Message message1 = new ClientMessageImpl(randomByte(), randomBoolean(), randomLong(), randomLong(), randomByte(), body);
+ Message message = message1;
+ message.setDestination(new SimpleString("oasoas"));
+
+ message.putStringProperty(new SimpleString("prop1"), new SimpleString("blah1"));
+ message.putStringProperty(new SimpleString("prop2"), new SimpleString("blah2"));
+ HornetQBuffer buffer = ChannelBuffers.buffer(message.getEncodeSize());
+ message.encode(buffer);
+ Message message2 = new ClientMessageImpl(false);
+ message2.decode(buffer);
+ assertMessagesEquivalent(message, message2);
+ }
+ }
+
+ public void getSetAttributes()
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ byte[] bytes = new byte[1000];
+ for (int i = 0; i < bytes.length; i++)
+ {
+ bytes[i] = randomByte();
+ }
+ HornetQBuffer body = ChannelBuffers.wrappedBuffer(bytes);
+
+ final byte type = randomByte();
+ final boolean durable = randomBoolean();
+ final long expiration = randomLong();
+ final long timestamp = randomLong();
+ final byte priority = randomByte();
+ Message message1 = new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
+
+ Message message = message1;
+
+ assertEquals(type, message.getType());
+ assertEquals(durable, message.isDurable());
+ assertEquals(expiration, message.getExpiration());
+ assertEquals(timestamp, message.getTimestamp());
+ assertEquals(priority, message.getPriority());
+
+ final SimpleString destination = new SimpleString(randomString());
+ final boolean durable2 = randomBoolean();
+ final long expiration2 = randomLong();
+ final long timestamp2 = randomLong();
+ final byte priority2 = randomByte();
+
+ message.setDestination(destination);
+ assertEquals(destination, message.getDestination());
+
+ message.setDurable(durable2);
+ assertEquals(durable2, message.isDurable());
+
+ message.setExpiration(expiration2);
+ assertEquals(expiration2, message.getExpiration());
+
+ message.setTimestamp(timestamp2);
+ assertEquals(timestamp2, message.getTimestamp());
+
+ message.setPriority(priority2);
+ assertEquals(priority2, message.getPriority());
+
+ message.setBody(body);
+ assertTrue(body == message.getBody());
+ }
+ }
+
+ public void testExpired()
+ {
+ Message message = new ClientMessageImpl(false);
+
+ assertEquals(0, message.getExpiration());
+ assertFalse(message.isExpired());
+
+ message.setExpiration(System.currentTimeMillis() + 1000);
+ assertFalse(message.isExpired());
+
+ message.setExpiration(System.currentTimeMillis() - 1);
+ assertTrue(message.isExpired());
+
+ message.setExpiration(System.currentTimeMillis() - 1000);
+ assertTrue(message.isExpired());
+
+ message.setExpiration(0);
+ assertFalse(message.isExpired());
+ }
+
+
+ public void testEncodingMessage() throws Exception
+ {
+
+ SimpleString address = new SimpleString("Simple Destination ");
+
+ Message msg = new ClientMessageImpl(false);
+
+ byte[] bytes = new byte[]{(byte)1, (byte)2, (byte)3};
+ msg.setBody(ChannelBuffers.wrappedBuffer(bytes));
+
+ msg.setDestination(address);
+ msg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
+ msg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless and bigger!"));
+ msg.putStringProperty(new SimpleString("Key2"), new SimpleString("This String is worthless and bigger and bigger!"));
+ msg.removeProperty(new SimpleString("Key2"));
+
+ checkSizes(msg, new ServerMessageImpl());
+
+ msg.removeProperty(new SimpleString("Key"));
+
+ checkSizes(msg, new ServerMessageImpl());
+ }
+
+ public void testProperties()
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ Message msg = new ClientMessageImpl(false);
+
+ SimpleString prop1 = new SimpleString("prop1");
+ boolean val1 = randomBoolean();
+ msg.putBooleanProperty(prop1, val1);
+
+ SimpleString prop2 = new SimpleString("prop2");
+ byte val2 = randomByte();
+ msg.putByteProperty(prop2, val2);
+
+ SimpleString prop3 = new SimpleString("prop3");
+ byte[] val3 = randomBytes();
+ msg.putBytesProperty(prop3, val3);
+
+ SimpleString prop4 = new SimpleString("prop4");
+ double val4 = randomDouble();
+ msg.putDoubleProperty(prop4, val4);
+
+ SimpleString prop5 = new SimpleString("prop5");
+ float val5 = randomFloat();
+ msg.putFloatProperty(prop5, val5);
+
+ SimpleString prop6 = new SimpleString("prop6");
+ int val6 = randomInt();
+ msg.putIntProperty(prop6, val6);
+
+ SimpleString prop7 = new SimpleString("prop7");
+ long val7 = randomLong();
+ msg.putLongProperty(prop7, val7);
+
+ SimpleString prop8 = new SimpleString("prop8");
+ short val8 = randomShort();
+ msg.putShortProperty(prop8, val8);
+
+ SimpleString prop9 = new SimpleString("prop9");
+ SimpleString val9 = new SimpleString(randomString());
+ msg.putStringProperty(prop9, val9);
+
+ assertEquals(9, msg.getPropertyNames().size());
+ assertTrue(msg.getPropertyNames().contains(prop1));
+ assertTrue(msg.getPropertyNames().contains(prop2));
+ assertTrue(msg.getPropertyNames().contains(prop3));
+ assertTrue(msg.getPropertyNames().contains(prop4));
+ assertTrue(msg.getPropertyNames().contains(prop5));
+ assertTrue(msg.getPropertyNames().contains(prop6));
+ assertTrue(msg.getPropertyNames().contains(prop7));
+ assertTrue(msg.getPropertyNames().contains(prop8));
+ assertTrue(msg.getPropertyNames().contains(prop9));
+
+ assertTrue(msg.containsProperty(prop1));
+ assertTrue(msg.containsProperty(prop2));
+ assertTrue(msg.containsProperty(prop3));
+ assertTrue(msg.containsProperty(prop4));
+ assertTrue(msg.containsProperty(prop5));
+ assertTrue(msg.containsProperty(prop6));
+ assertTrue(msg.containsProperty(prop7));
+ assertTrue(msg.containsProperty(prop8));
+ assertTrue(msg.containsProperty(prop9));
+
+ assertEquals(val1, msg.getProperty(prop1));
+ assertEquals(val2, msg.getProperty(prop2));
+ assertEquals(val3, msg.getProperty(prop3));
+ assertEquals(val4, msg.getProperty(prop4));
+ assertEquals(val5, msg.getProperty(prop5));
+ assertEquals(val6, msg.getProperty(prop6));
+ assertEquals(val7, msg.getProperty(prop7));
+ assertEquals(val8, msg.getProperty(prop8));
+ assertEquals(val9, msg.getProperty(prop9));
+
+ SimpleString val10 = new SimpleString(randomString());
+ //test overwrite
+ msg.putStringProperty(prop9, val10);
+ assertEquals(val10, msg.getProperty(prop9));
+
+ int val11 = randomInt();
+ msg.putIntProperty(prop9, val11);
+ assertEquals(val11, msg.getProperty(prop9));
+
+ msg.removeProperty(prop1);
+ assertEquals(8, msg.getPropertyNames().size());
+ assertTrue(msg.getPropertyNames().contains(prop2));
+ assertTrue(msg.getPropertyNames().contains(prop3));
+ assertTrue(msg.getPropertyNames().contains(prop4));
+ assertTrue(msg.getPropertyNames().contains(prop5));
+ assertTrue(msg.getPropertyNames().contains(prop6));
+ assertTrue(msg.getPropertyNames().contains(prop7));
+ assertTrue(msg.getPropertyNames().contains(prop8));
+ assertTrue(msg.getPropertyNames().contains(prop9));
+
+ msg.removeProperty(prop2);
+ assertEquals(7, msg.getPropertyNames().size());
+ assertTrue(msg.getPropertyNames().contains(prop3));
+ assertTrue(msg.getPropertyNames().contains(prop4));
+ assertTrue(msg.getPropertyNames().contains(prop5));
+ assertTrue(msg.getPropertyNames().contains(prop6));
+ assertTrue(msg.getPropertyNames().contains(prop7));
+ assertTrue(msg.getPropertyNames().contains(prop8));
+ assertTrue(msg.getPropertyNames().contains(prop9));
+
+ msg.removeProperty(prop9);
+ assertEquals(6, msg.getPropertyNames().size());
+ assertTrue(msg.getPropertyNames().contains(prop3));
+ assertTrue(msg.getPropertyNames().contains(prop4));
+ assertTrue(msg.getPropertyNames().contains(prop5));
+ assertTrue(msg.getPropertyNames().contains(prop6));
+ assertTrue(msg.getPropertyNames().contains(prop7));
+ assertTrue(msg.getPropertyNames().contains(prop8));
+
+ msg.removeProperty(prop3);
+ msg.removeProperty(prop4);
+ msg.removeProperty(prop5);
+ msg.removeProperty(prop6);
+ msg.removeProperty(prop7);
+ msg.removeProperty(prop8);
+ assertEquals(0, msg.getPropertyNames().size());
+ }
+ }
+
+ // Protected -------------------------------------------------------------------------------
+
+ protected void assertMessagesEquivalent(final Message msg1, final Message msg2)
+ {
+ assertEquals(msg1.isDurable(), msg2.isDurable());
+
+ assertEquals(msg1.getExpiration(), msg2.getExpiration());
+
+ assertEquals(msg1.getTimestamp(), msg2.getTimestamp());
+
+ assertEquals(msg1.getPriority(), msg2.getPriority());
+
+ assertEquals(msg1.getType(), msg2.getType());
+
+ assertEqualsByteArrays(msg1.getBody().array(), msg2.getBody().array());
+
+ assertEquals(msg1.getDestination(), msg2.getDestination());
+
+ Set<SimpleString> props1 = msg1.getPropertyNames();
+
+ Set<SimpleString> props2 = msg2.getPropertyNames();
+
+ assertEquals(props1.size(), props2.size());
+
+ for (SimpleString propname: props1)
+ {
+ Object val1 = msg1.getProperty(propname);
+
+ Object val2 = msg2.getProperty(propname);
+
+ assertEquals(val1, val2);
+ }
+ }
+
+ // Private ----------------------------------------------------------------------------------
+
+ private void checkSizes(final Message obj, final EncodingSupport newObject)
+ {
+ HornetQBuffer buffer = ChannelBuffers.buffer(1024);
+ obj.encode(buffer);
+ assertEquals (buffer.writerIndex(), obj.getEncodeSize());
+ int originalSize = buffer.writerIndex();
+
+ buffer.resetReaderIndex();
+ newObject.decode(buffer);
+
+
+ HornetQBuffer newBuffer = ChannelBuffers.buffer(1024);
+
+ newObject.encode(newBuffer);
+
+ assertEquals(newObject.getEncodeSize(), newBuffer.writerIndex());
+ assertEquals(originalSize, newBuffer.writerIndex());
+ }
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTestBase.java 2009-10-20 15:54:02 UTC (rev 8131)
+++ trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTestBase.java 2009-10-21 09:11:46 UTC (rev 8132)
@@ -1,347 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.message.impl;
-
-import static org.hornetq.tests.util.RandomUtil.randomBoolean;
-import static org.hornetq.tests.util.RandomUtil.randomByte;
-import static org.hornetq.tests.util.RandomUtil.randomBytes;
-import static org.hornetq.tests.util.RandomUtil.randomDouble;
-import static org.hornetq.tests.util.RandomUtil.randomFloat;
-import static org.hornetq.tests.util.RandomUtil.randomInt;
-import static org.hornetq.tests.util.RandomUtil.randomLong;
-import static org.hornetq.tests.util.RandomUtil.randomShort;
-import static org.hornetq.tests.util.RandomUtil.randomString;
-
-import java.util.Set;
-
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.SimpleString;
-
-/**
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public abstract class MessageImplTestBase extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(MessageImplTestBase.class);
-
- protected abstract Message createMessage(final byte type, final boolean durable, final long expiration,
- final long timestamp, final byte priority, HornetQBuffer buffer);
-
- protected abstract Message createMessage();
-
- public void testEncodeDecode()
- {
- for (int j = 0; j < 10; j++)
- {
- byte[] bytes = new byte[1000];
- for (int i = 0; i < bytes.length; i++)
- {
- bytes[i] = randomByte();
- }
- HornetQBuffer body = ChannelBuffers.wrappedBuffer(bytes);
- Message message = createMessage(randomByte(), randomBoolean(), randomLong(),
- randomLong(), randomByte(), body);
- message.setDestination(new SimpleString("oasoas"));
-
- message.putStringProperty(new SimpleString("prop1"), new SimpleString("blah1"));
- message.putStringProperty(new SimpleString("prop2"), new SimpleString("blah2"));
- HornetQBuffer buffer = ChannelBuffers.buffer(message.getEncodeSize());
- message.encode(buffer);
- Message message2 = createMessage();
- message2.decode(buffer);
- assertMessagesEquivalent(message, message2);
- }
- }
-
- public void getSetAttributes()
- {
- for (int j = 0; j < 10; j++)
- {
- byte[] bytes = new byte[1000];
- for (int i = 0; i < bytes.length; i++)
- {
- bytes[i] = randomByte();
- }
- HornetQBuffer body = ChannelBuffers.wrappedBuffer(bytes);
-
- final byte type = randomByte();
- final boolean durable = randomBoolean();
- final long expiration = randomLong();
- final long timestamp = randomLong();
- final byte priority = randomByte();
-
- Message message = createMessage(type, durable, expiration,
- timestamp, priority, body);
-
- assertEquals(type, message.getType());
- assertEquals(durable, message.isDurable());
- assertEquals(expiration, message.getExpiration());
- assertEquals(timestamp, message.getTimestamp());
- assertEquals(priority, message.getPriority());
-
- final SimpleString destination = new SimpleString(randomString());
- final boolean durable2 = randomBoolean();
- final long expiration2 = randomLong();
- final long timestamp2 = randomLong();
- final byte priority2 = randomByte();
-
- message.setDestination(destination);
- assertEquals(destination, message.getDestination());
-
- message.setDurable(durable2);
- assertEquals(durable2, message.isDurable());
-
- message.setExpiration(expiration2);
- assertEquals(expiration2, message.getExpiration());
-
- message.setTimestamp(timestamp2);
- assertEquals(timestamp2, message.getTimestamp());
-
- message.setPriority(priority2);
- assertEquals(priority2, message.getPriority());
-
- message.setBody(body);
- assertTrue(body == message.getBody());
- }
- }
-
- public void testExpired()
- {
- Message message = createMessage();
-
- assertEquals(0, message.getExpiration());
- assertFalse(message.isExpired());
-
- message.setExpiration(System.currentTimeMillis() + 1000);
- assertFalse(message.isExpired());
-
- message.setExpiration(System.currentTimeMillis() - 1);
- assertTrue(message.isExpired());
-
- message.setExpiration(System.currentTimeMillis() - 1000);
- assertTrue(message.isExpired());
-
- message.setExpiration(0);
- assertFalse(message.isExpired());
- }
-
-
- public void testEncodingMessage() throws Exception
- {
-
- SimpleString address = new SimpleString("Simple Destination ");
-
- Message msg = createMessage();
-
- byte[] bytes = new byte[]{(byte)1, (byte)2, (byte)3};
- msg.setBody(ChannelBuffers.wrappedBuffer(bytes));
-
- msg.setDestination(address);
- msg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
- msg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless and bigger!"));
- msg.putStringProperty(new SimpleString("Key2"), new SimpleString("This String is worthless and bigger and bigger!"));
- msg.removeProperty(new SimpleString("Key2"));
-
- checkSizes(msg, new ServerMessageImpl());
-
- msg.removeProperty(new SimpleString("Key"));
-
- checkSizes(msg, new ServerMessageImpl());
- }
-
- public void testProperties()
- {
- for (int j = 0; j < 10; j++)
- {
- Message msg = createMessage();
-
- SimpleString prop1 = new SimpleString("prop1");
- boolean val1 = randomBoolean();
- msg.putBooleanProperty(prop1, val1);
-
- SimpleString prop2 = new SimpleString("prop2");
- byte val2 = randomByte();
- msg.putByteProperty(prop2, val2);
-
- SimpleString prop3 = new SimpleString("prop3");
- byte[] val3 = randomBytes();
- msg.putBytesProperty(prop3, val3);
-
- SimpleString prop4 = new SimpleString("prop4");
- double val4 = randomDouble();
- msg.putDoubleProperty(prop4, val4);
-
- SimpleString prop5 = new SimpleString("prop5");
- float val5 = randomFloat();
- msg.putFloatProperty(prop5, val5);
-
- SimpleString prop6 = new SimpleString("prop6");
- int val6 = randomInt();
- msg.putIntProperty(prop6, val6);
-
- SimpleString prop7 = new SimpleString("prop7");
- long val7 = randomLong();
- msg.putLongProperty(prop7, val7);
-
- SimpleString prop8 = new SimpleString("prop8");
- short val8 = randomShort();
- msg.putShortProperty(prop8, val8);
-
- SimpleString prop9 = new SimpleString("prop9");
- SimpleString val9 = new SimpleString(randomString());
- msg.putStringProperty(prop9, val9);
-
- assertEquals(9, msg.getPropertyNames().size());
- assertTrue(msg.getPropertyNames().contains(prop1));
- assertTrue(msg.getPropertyNames().contains(prop2));
- assertTrue(msg.getPropertyNames().contains(prop3));
- assertTrue(msg.getPropertyNames().contains(prop4));
- assertTrue(msg.getPropertyNames().contains(prop5));
- assertTrue(msg.getPropertyNames().contains(prop6));
- assertTrue(msg.getPropertyNames().contains(prop7));
- assertTrue(msg.getPropertyNames().contains(prop8));
- assertTrue(msg.getPropertyNames().contains(prop9));
-
- assertTrue(msg.containsProperty(prop1));
- assertTrue(msg.containsProperty(prop2));
- assertTrue(msg.containsProperty(prop3));
- assertTrue(msg.containsProperty(prop4));
- assertTrue(msg.containsProperty(prop5));
- assertTrue(msg.containsProperty(prop6));
- assertTrue(msg.containsProperty(prop7));
- assertTrue(msg.containsProperty(prop8));
- assertTrue(msg.containsProperty(prop9));
-
- assertEquals(val1, msg.getProperty(prop1));
- assertEquals(val2, msg.getProperty(prop2));
- assertEquals(val3, msg.getProperty(prop3));
- assertEquals(val4, msg.getProperty(prop4));
- assertEquals(val5, msg.getProperty(prop5));
- assertEquals(val6, msg.getProperty(prop6));
- assertEquals(val7, msg.getProperty(prop7));
- assertEquals(val8, msg.getProperty(prop8));
- assertEquals(val9, msg.getProperty(prop9));
-
- SimpleString val10 = new SimpleString(randomString());
- //test overwrite
- msg.putStringProperty(prop9, val10);
- assertEquals(val10, msg.getProperty(prop9));
-
- int val11 = randomInt();
- msg.putIntProperty(prop9, val11);
- assertEquals(val11, msg.getProperty(prop9));
-
- msg.removeProperty(prop1);
- assertEquals(8, msg.getPropertyNames().size());
- assertTrue(msg.getPropertyNames().contains(prop2));
- assertTrue(msg.getPropertyNames().contains(prop3));
- assertTrue(msg.getPropertyNames().contains(prop4));
- assertTrue(msg.getPropertyNames().contains(prop5));
- assertTrue(msg.getPropertyNames().contains(prop6));
- assertTrue(msg.getPropertyNames().contains(prop7));
- assertTrue(msg.getPropertyNames().contains(prop8));
- assertTrue(msg.getPropertyNames().contains(prop9));
-
- msg.removeProperty(prop2);
- assertEquals(7, msg.getPropertyNames().size());
- assertTrue(msg.getPropertyNames().contains(prop3));
- assertTrue(msg.getPropertyNames().contains(prop4));
- assertTrue(msg.getPropertyNames().contains(prop5));
- assertTrue(msg.getPropertyNames().contains(prop6));
- assertTrue(msg.getPropertyNames().contains(prop7));
- assertTrue(msg.getPropertyNames().contains(prop8));
- assertTrue(msg.getPropertyNames().contains(prop9));
-
- msg.removeProperty(prop9);
- assertEquals(6, msg.getPropertyNames().size());
- assertTrue(msg.getPropertyNames().contains(prop3));
- assertTrue(msg.getPropertyNames().contains(prop4));
- assertTrue(msg.getPropertyNames().contains(prop5));
- assertTrue(msg.getPropertyNames().contains(prop6));
- assertTrue(msg.getPropertyNames().contains(prop7));
- assertTrue(msg.getPropertyNames().contains(prop8));
-
- msg.removeProperty(prop3);
- msg.removeProperty(prop4);
- msg.removeProperty(prop5);
- msg.removeProperty(prop6);
- msg.removeProperty(prop7);
- msg.removeProperty(prop8);
- assertEquals(0, msg.getPropertyNames().size());
- }
- }
-
- // Protected -------------------------------------------------------------------------------
-
- protected void assertMessagesEquivalent(final Message msg1, final Message msg2)
- {
- assertEquals(msg1.isDurable(), msg2.isDurable());
-
- assertEquals(msg1.getExpiration(), msg2.getExpiration());
-
- assertEquals(msg1.getTimestamp(), msg2.getTimestamp());
-
- assertEquals(msg1.getPriority(), msg2.getPriority());
-
- assertEquals(msg1.getType(), msg2.getType());
-
- assertEqualsByteArrays(msg1.getBody().array(), msg2.getBody().array());
-
- assertEquals(msg1.getDestination(), msg2.getDestination());
-
- Set<SimpleString> props1 = msg1.getPropertyNames();
-
- Set<SimpleString> props2 = msg2.getPropertyNames();
-
- assertEquals(props1.size(), props2.size());
-
- for (SimpleString propname: props1)
- {
- Object val1 = msg1.getProperty(propname);
-
- Object val2 = msg2.getProperty(propname);
-
- assertEquals(val1, val2);
- }
- }
-
- // Private ----------------------------------------------------------------------------------
-
- private void checkSizes(final Message obj, final EncodingSupport newObject)
- {
- HornetQBuffer buffer = ChannelBuffers.buffer(1024);
- obj.encode(buffer);
- assertEquals (buffer.writerIndex(), obj.getEncodeSize());
- int originalSize = buffer.writerIndex();
-
- buffer.resetReaderIndex();
- newObject.decode(buffer);
-
-
- HornetQBuffer newBuffer = ChannelBuffers.buffer(1024);
-
- newObject.encode(newBuffer);
-
- assertEquals(newObject.getEncodeSize(), newBuffer.writerIndex());
- assertEquals(originalSize, newBuffer.writerIndex());
- }
-}
14 years, 6 months
JBoss hornetq SVN: r8131 - trunk/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-20 11:54:02 -0400 (Tue, 20 Oct 2009)
New Revision: 8131
Modified:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Fixing the test
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-20 12:43:46 UTC (rev 8130)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-20 15:54:02 UTC (rev 8131)
@@ -270,32 +270,19 @@
Configuration config = createDefaultConfig(false);
config.setBackup(true);
-
- final AtomicBoolean returnIntercept = new AtomicBoolean(true);
- final Interceptor intercept = new Interceptor()
- {
-
- public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
- {
- if (returnIntercept.get())
- {
- System.out.println("Returning true");
- }
- return returnIntercept.get();
- }
-
- };
-
+ ArrayList<String> intercepts = new ArrayList<String>();
+
+ intercepts.add(TestInterceptor.class.getName());
+
+ config.setInterceptorClassNames(intercepts);
+
HornetQServer server = new HornetQServerImpl(config);
server.start();
- final ArrayList<Interceptor> listInterceptor = new ArrayList<Interceptor>();
- listInterceptor.add(intercept);
+ FailoverManager failoverManager = createFailoverManager();
- FailoverManager failoverManager = createFailoverManager(listInterceptor);
-
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
@@ -304,7 +291,7 @@
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
Thread.sleep(100);
- returnIntercept.set(false);
+ TestInterceptor.value.set(false);
for (int i = 0; i < 500; i++)
{
@@ -507,7 +494,18 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ public static class TestInterceptor implements Interceptor
+ {
+ static AtomicBoolean value = new AtomicBoolean(true);
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ return value.get();
+ }
+
+ };
+
+
static class FakeJournal implements Journal
{
14 years, 6 months
JBoss hornetq SVN: r8130 - in trunk: src/main/org/hornetq/core/remoting/impl/invm and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-20 08:43:46 -0400 (Tue, 20 Oct 2009)
New Revision: 8130
Modified:
trunk/src/main/org/hornetq/core/management/NotificationType.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-37: Add additional core management notifications
* added notifications when acceptors & cluster connections are started/stopped
Modified: trunk/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -30,8 +30,13 @@
BROADCAST_GROUP_STARTED(10),
BROADCAST_GROUP_STOPPED(11),
BRIDGE_STARTED(12),
- BRIDGE_STOPPED(13);
+ BRIDGE_STOPPED(13),
+ CLUSTER_CONNECTION_STARTED(14),
+ CLUSTER_CONNECTION_STOPPED(15),
+ ACCEPTOR_STARTED(16),
+ ACCEPTOR_STOPPED(17);
+
private final int value;
private NotificationType(int value)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -19,6 +19,9 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
@@ -26,6 +29,8 @@
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
/**
* A InVMAcceptor
@@ -51,6 +56,8 @@
private boolean paused;
+ private NotificationService notificationService;
+
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -74,6 +81,15 @@
InVMRegistry.instance.registerAcceptor(id, this);
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+ props.putIntProperty(new SimpleString("id"), id);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+
started = true;
paused = false;
@@ -98,6 +114,23 @@
connections.clear();
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName()));
+ props.putIntProperty(new SimpleString("id"), id);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
started = false;
paused = false;
@@ -134,6 +167,11 @@
paused = false;
}
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
public BufferHandler getHandler()
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -174,6 +174,7 @@
if (managementService != null)
{
+ acceptor.setNotificationService(managementService);
managementService.registerAcceptor(acceptor, info);
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -13,10 +13,11 @@
package org.hornetq.core.remoting.spi;
+import org.hornetq.core.management.NotificationService;
import org.hornetq.core.server.HornetQComponent;
/**
- * An Acceptor is used tby the Remoting Service to allow clients to connect. It should take care of dispatchin client requests
+ * An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of dispatching client requests
* to the Remoting Service's Dispatcher.
*
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -27,4 +28,6 @@
void pause();
void resume();
+
+ void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -47,6 +47,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
/**
@@ -218,6 +219,14 @@
}
started = true;
+
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
+ managementService.sendNotification(notification);
+ }
}
public synchronized void stop() throws Exception
@@ -243,6 +252,14 @@
}
}
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+ managementService.sendNotification(notification);
+ }
+
started = false;
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -31,12 +31,17 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.VersionLoader;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -121,6 +126,8 @@
private final Executor threadPool;
+ private NotificationService notificationService;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -303,6 +310,16 @@
log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion().getNettyVersion() + " using " + Version.ID);
}
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+ props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+ props.putIntProperty(new SimpleString("port"), port);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+
log.info("Started Netty Acceptor version " + Version.ID);
}
@@ -364,6 +381,24 @@
}
connections.clear();
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
+ props.putStringProperty(new SimpleString("host"), new SimpleString(host));
+ props.putIntProperty(new SimpleString("port"), port);
+ Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
paused = false;
}
@@ -414,6 +449,11 @@
paused = false;
}
+ public void setNotificationService(final NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
// Inner classes -----------------------------------------------------------------------------
@ChannelPipelineCoverage("one")
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -25,10 +25,14 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.utils.SimpleString;
/**
* A AcceptorControlTest
@@ -133,7 +137,43 @@
}
}
+
+ public void testNotifications() throws Exception
+ {
+ TransportConfiguration acceptorConfig = new TransportConfiguration(InVMAcceptorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ randomString());
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations().add(acceptorConfig);
+ service = HornetQ.newHornetQServer(conf, mbeanServer, false);
+ service.start();
+ AcceptorControl acceptorControl = createManagementControl(acceptorConfig.getName());
+
+
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+
+ service.getManagementService().addNotificationListener(notifListener);
+
+ assertEquals(0, notifListener.getNotifications().size());
+
+ acceptorControl.stop();
+
+ assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ assertEquals(NotificationType.ACCEPTOR_STOPPED, notif.getType());
+ assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));
+
+ acceptorControl.start();
+
+ assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ assertEquals(NotificationType.ACCEPTOR_STARTED, notif.getType());
+ assertEquals(InVMAcceptorFactory.class.getName(), (notif.getProperties().getProperty(new SimpleString("factory")).toString()));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-20 12:33:47 UTC (rev 8129)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-10-20 12:43:46 UTC (rev 8130)
@@ -33,12 +33,16 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.management.ObjectNameBuilder;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.utils.Pair;
+import org.hornetq.utils.SimpleString;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -144,6 +148,31 @@
assertTrue(clusterConnectionControl.isStarted());
}
+ public void testNotifications() throws Exception
+ {
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ checkResource(ObjectNameBuilder.DEFAULT.getClusterConnectionObjectName(clusterConnectionConfig1.getName()));
+ ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());
+
+ server_0.getManagementService().addNotificationListener(notifListener);
+
+ assertEquals(0, notifListener.getNotifications().size());
+
+ clusterConnectionControl.stop();
+
+ assertTrue(notifListener.getNotifications().size() > 0);
+ Notification notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+ assertEquals(NotificationType.CLUSTER_CONNECTION_STOPPED, notif.getType());
+ assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));
+
+ clusterConnectionControl.start();
+
+ assertTrue(notifListener.getNotifications().size() > 0);
+ notif = notifListener.getNotifications().get(notifListener.getNotifications().size() - 1);
+ assertEquals(NotificationType.CLUSTER_CONNECTION_STARTED, notif.getType());
+ assertEquals(clusterConnectionControl.getName(), (notif.getProperties().getProperty(new SimpleString("name")).toString()));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 6 months
JBoss hornetq SVN: r8129 - trunk/src/main/org/hornetq/core/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-20 08:33:47 -0400 (Tue, 20 Oct 2009)
New Revision: 8129
Removed:
trunk/src/main/org/hornetq/core/management/ReplicationOperationInvoker.java
Log:
removed unused interface from management API
Deleted: trunk/src/main/org/hornetq/core/management/ReplicationOperationInvoker.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ReplicationOperationInvoker.java 2009-10-19 17:36:43 UTC (rev 8128)
+++ trunk/src/main/org/hornetq/core/management/ReplicationOperationInvoker.java 2009-10-20 12:33:47 UTC (rev 8129)
@@ -1,27 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-
-/**
- * A ReplicationOperationInvoker
- *
- * @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- */
-public interface ReplicationOperationInvoker
-{
- Object invoke(String resourceName, String operationName, Object... parameters) throws Exception;
-
- void stop() throws Exception;
-}
\ No newline at end of file
14 years, 6 months
JBoss hornetq SVN: r8128 - trunk/tests/src/org/hornetq/tests/integration/jms/divert.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-19 13:36:43 -0400 (Mon, 19 Oct 2009)
New Revision: 8128
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
Log:
just javadoc
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-19 11:41:18 UTC (rev 8127)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-19 17:36:43 UTC (rev 8128)
@@ -169,7 +169,7 @@
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
- true,
+ true, // this test needs to block on ACK
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
DEFAULT_AUTO_GROUP,
14 years, 6 months
JBoss hornetq SVN: r8127 - branches/hornetq_grouping/src/main/org/hornetq/core/server/group.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-10-19 07:41:18 -0400 (Mon, 19 Oct 2009)
New Revision: 8127
Removed:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
Log:
removed unused class
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java 2009-10-19 11:41:18 UTC (rev 8127)
@@ -1,26 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group;
-
-import org.hornetq.utils.TypedProperties;
-import org.hornetq.core.server.group.impl.Proposal;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public interface ProposalHandler
-{
- public void handleSend(Proposal proposal, TypedProperties clientMessage);
-
- void handleReceive(Proposal proposal);
-}
14 years, 6 months
JBoss hornetq SVN: r8126 - in branches/hornetq_grouping: src/main/org/hornetq/core/config/impl and 15 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-10-19 07:30:49 -0400 (Mon, 19 Oct 2009)
New Revision: 8126
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
some refactoring
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -130,9 +130,9 @@
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
- List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+ GroupingHandlerConfiguration getGroupingHandlerConfiguration();
- void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+ void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration);
List<BridgeConfiguration> getBridgeConfigurations();
@@ -309,5 +309,4 @@
void setMessageExpiryThreadPriority(int messageExpiryThreadPriority);
-
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -230,7 +230,7 @@
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
- protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+ protected GroupingHandlerConfiguration groupingHandlerConfiguration;
// Paging related attributes ------------------------------------------------------------
@@ -484,12 +484,12 @@
this.backupConnectorName = backupConnectorName;
}
- public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+ public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
}
- public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+ public void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration)
{
this.groupingHandlerConfiguration = groupingHandlerConfiguration;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -577,19 +577,16 @@
}
private void parseGroupingHandlerConfiguration(final Element node)
- {
- String name = node.getAttribute("name");
- String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
- String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
- Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
- GroupingHandlerConfiguration arbitratorConfiguration =
- new GroupingHandlerConfiguration(new SimpleString(name),
- type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
- new SimpleString(address),
- timeout);
- System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
- groupingHandlerConfiguration.add(arbitratorConfiguration);
- }
+ {
+ String name = node.getAttribute("name");
+ String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
+ groupingHandlerConfiguration = new GroupingHandlerConfiguration(new SimpleString(name),
+ type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address),
+ timeout);
+ }
private void parseBridgeConfiguration(final Element brNode)
Added: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.persistence;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 18, 2009
+ */
+public interface GroupingInfo
+{
+ public SimpleString getClusterName();
+
+ public SimpleString getGroupId();
+
+ public long getId();
+}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -111,5 +112,11 @@
void deleteQueueBinding(long queueBindingID) throws Exception;
- void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos) throws Exception;
+ void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
+
+ //grouping relateed operations
+ void addGrouping(GroupBinding groupBinding) throws Exception;
+
+
+ void deleteGrouping(GroupBinding groupBinding) throws Exception;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -50,6 +50,7 @@
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -58,6 +59,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -85,6 +87,8 @@
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
+ //grouping journal record type
+ public static final byte GROUP_RECORD = 41;
// Bindings journal record type
public static final byte QUEUE_BINDING_RECORD = 21;
@@ -988,7 +992,19 @@
resourceManager.putTransaction(xid, tx);
}
}
+ //grouping handler operations
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName());
+ bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ }
+
// Bindings operations
public void addQueueBinding(final Binding binding) throws Exception
@@ -1011,7 +1027,7 @@
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1045,6 +1061,13 @@
persistentID = encoding.uuid;
}
+ else if(rec == GROUP_RECORD)
+ {
+ GroupingEncoding encoding = new GroupingEncoding();
+ encoding.decode(buffer);
+ encoding.setId(id);
+ groupingInfos.add(encoding);
+ }
else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
{
idGenerator.loadState(record.id, buffer);
@@ -1259,6 +1282,63 @@
}
}
+ private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+ {
+ long id;
+
+ SimpleString groupId;
+
+ SimpleString clusterName;
+
+ public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupingEncoding()
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(groupId);
+ buffer.writeSimpleString(clusterName);
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ groupId = buffer.readSimpleString();
+ clusterName = buffer.readSimpleString();
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+ }
+
private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
long id;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -24,11 +24,13 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -74,7 +76,7 @@
{
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
}
@@ -247,4 +249,14 @@
{
}
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -64,8 +64,4 @@
void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
Object getNotificationLock();
-
- void setGroupingHandler(GroupingHandler groupingHandler);
-
- GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -28,7 +28,6 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
-import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
@@ -60,11 +59,11 @@
private volatile boolean routeWhenNoConsumers;
- private final PostOffice postOffice;
+ private final GroupingHandler groupingHandler;
- public BindingsImpl(PostOffice postOffice)
+ public BindingsImpl(GroupingHandler groupingHandler)
{
- this.postOffice = postOffice;
+ this.groupingHandler = groupingHandler;
}
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
@@ -278,15 +277,13 @@
if (!routed)
{
- GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
-
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
routed = routeFromCluster(message, tx);
}
- else if (groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
+ else if (groupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
{
- routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
+ routeUsingStrictOrdering(message, tx, groupingHandler);
}
else
{
@@ -462,12 +459,12 @@
resp = groupingGroupingHandler.propose(new Proposal(fullID, chosen.getClusterName()));
- if (resp.getAlternative() != null)
+ if (resp.getAlternativeClusterName() != null)
{
chosen = null;
for (Binding binding : bindings)
{
- if (binding.getClusterName().equals(resp.getAlternative()))
+ if (binding.getClusterName().equals(resp.getAlternativeClusterName()))
{
chosen = binding;
break;
@@ -483,7 +480,7 @@
}
else
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
}
}
else
@@ -491,7 +488,7 @@
Binding chosen = null;
for (Binding binding : bindings)
{
- if (binding.getClusterName().equals(resp.getChosen()))
+ if (binding.getClusterName().equals(resp.getChosenClusterName()))
{
chosen = binding;
break;
@@ -505,7 +502,7 @@
}
else
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -111,7 +111,7 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private GroupingHandler groupingGroupingHandler;
+ private final HornetQServer server;
public PostOfficeImpl(final HornetQServer server,
final StorageManager storageManager,
@@ -124,7 +124,7 @@
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
- HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
this.storageManager = storageManager;
@@ -155,6 +155,8 @@
this.redistributorExecutorFactory = orderedExecutorFactory;
this.addressSettingsRepository = addressSettingsRepository;
+
+ this.server = server;
}
// HornetQComponent implementation ---------------------------------------
@@ -694,18 +696,6 @@
return notificationLock;
}
-
- public void setGroupingHandler(GroupingHandler groupingHandler)
- {
- groupingGroupingHandler = groupingHandler;
- managementService.addNotificationListener(groupingGroupingHandler);
- }
-
- public GroupingHandler getGroupingHandler()
- {
- return groupingGroupingHandler;
- }
-
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
// We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
@@ -1047,6 +1037,6 @@
public Bindings createBindings()
{
- return new BindingsImpl(this);
+ return new BindingsImpl(server.getGroupingHandler());
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Set;
+import java.nio.channels.DatagramChannel;
import javax.management.MBeanServer;
@@ -30,6 +31,7 @@
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -126,4 +128,8 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
ExecutorFactory getExecutorFactory();
+
+ void setGroupingHandler(GroupingHandler groupingHandler);
+
+ GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -554,10 +554,10 @@
}
SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
- Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
+ Response response = server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
if(response != null)
{
- postOffice.getGroupingHandler().send(response, 0);
+ server.getGroupingHandler().send(response, 0);
}
}
@@ -572,8 +572,8 @@
SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
Response response = new Response(type, val, alt);
- postOffice.getGroupingHandler().proposed(response);
- postOffice.getGroupingHandler().send(response, hops + 1);
+ server.getGroupingHandler().proposed(response);
+ server.getGroupingHandler().send(response, hops + 1);
}
private synchronized void clearBindings() throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -37,7 +37,6 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -149,11 +148,6 @@
{
deployClusterConnection(config);
}
-
- for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
- {
- deployGroupingHandlerConfigurations(config);
- }
started = true;
}
@@ -486,21 +480,6 @@
bridge.start();
}
- private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
- {
- GroupingHandler groupingHandler;
- if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
- {
- groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress());
- }
- else
- {
- groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
- }
- log.info("deploying grouping handler: " + groupingHandler);
- postOffice.setGroupingHandler(groupingHandler);
- }
-
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
import org.hornetq.utils.SimpleString;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.management.NotificationListener;
import org.hornetq.core.management.Notification;
@@ -33,5 +34,5 @@
Response receive(Proposal proposal, int distance) throws Exception;
- void onNotification(Notification notification);
+ void addGroupBinding(GroupBinding groupBinding);
}
Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 19, 2009
+ */
+public class GroupBinding
+{
+ private long id;
+
+ private final SimpleString groupId;
+
+ private final SimpleString clusterName;
+
+ public GroupBinding(SimpleString groupId, SimpleString clusterName)
+ {
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupBinding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,13 +19,11 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
/**
@@ -35,21 +33,23 @@
{
private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
- private ConcurrentHashMap<SimpleString, SimpleString> map = new ConcurrentHashMap<SimpleString, SimpleString>();
+ private ConcurrentHashMap<SimpleString, GroupBinding> map = new ConcurrentHashMap<SimpleString, GroupBinding>();
- private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+ private HashMap<SimpleString, GroupBinding> groupMap = new HashMap<SimpleString, GroupBinding>();
private final SimpleString name;
private final ManagementService managementService;
private SimpleString address;
+ private StorageManager storageManager;
- public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+ public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, StorageManager storageManager)
{
this.managementService = managementService;
this.name = name;
this.address = address;
+ this.storageManager = storageManager;
}
public SimpleString getName()
@@ -60,20 +60,23 @@
public Response propose(Proposal proposal) throws Exception
{
- if(proposal.getProposal() == null)
+ if(proposal.getClusterName() == null)
{
- SimpleString original = map.get(proposal.getProposalType());
- return original == null?null:new Response(proposal.getProposalType(), original);
+ GroupBinding original = map.get(proposal.getGroupId());
+ return original == null?null:new Response(proposal.getGroupId(), original.getClusterName());
}
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+ GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
+ if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
{
- groupMap.put(response.getChosen(), response.getResponseType());
- return response;
+ groupBinding.setId(storageManager.generateUniqueID());
+ groupMap.put(groupBinding.getClusterName(), groupBinding);
+ storageManager.addGrouping(groupBinding);
+ return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
{
- return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+ groupBinding = map.get(proposal.getGroupId());
+ return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
}
}
@@ -84,9 +87,9 @@
public void send(Response response, int distance) throws Exception
{
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternative());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getClusterName());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternativeClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -99,16 +102,30 @@
return propose(proposal);
}
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ map.put(groupBinding.getGroupId(), groupBinding);
+ groupMap.put(groupBinding.getClusterName(), groupBinding);
+ }
+
public void onNotification(Notification notification)
{
if(notification.getType() == NotificationType.BINDING_REMOVED)
{
SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- SimpleString val = groupMap.get(clusterName);
+ GroupBinding val = groupMap.get(clusterName);
if(val != null)
{
groupMap.remove(clusterName);
- map.remove(val);
+ map.remove(val.getGroupId());
+ try
+ {
+ storageManager.deleteGrouping(val);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to delete group binding info " + val.getGroupId(), e);
+ }
}
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,26 +19,26 @@
*/
public class Proposal
{
- private final SimpleString proposalType;
- private final SimpleString proposal;
+ private final SimpleString groupId;
+ private final SimpleString clusterName;
public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
- public Proposal(SimpleString proposalType, SimpleString proposal)
+ public Proposal(SimpleString groupId, SimpleString clusterName)
{
- this.proposal = proposal;
- this.proposalType = proposalType;
+ this.clusterName = clusterName;
+ this.groupId = groupId;
}
- public SimpleString getProposalType()
+ public SimpleString getGroupId()
{
- return proposalType;
+ return groupId;
}
- public SimpleString getProposal()
+ public SimpleString getClusterName()
{
- return proposal;
+ return clusterName;
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -67,12 +67,12 @@
public Response propose(final Proposal proposal) throws Exception
{
- Response response = responses.get(proposal.getProposalType());
+ Response response = responses.get(proposal.getGroupId());
if( response != null)
{
return response;
}
- if (proposal.getProposal() == null)
+ if (proposal.getClusterName() == null)
{
return null;
}
@@ -80,15 +80,15 @@
{
lock.lock();
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
managementService.sendNotification(notification);
sendCondition.await(timeout, TimeUnit.MILLISECONDS);
- response = responses.get(proposal.getProposalType());
+ response = responses.get(proposal.getGroupId());
}
finally
{
@@ -96,7 +96,7 @@
}
if(response == null)
{
- throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+ throw new IllegalStateException("no response received from group handler for " + proposal.getGroupId());
}
return response;
}
@@ -106,8 +106,8 @@
try
{
lock.lock();
- responses.put(response.getResponseType(), response);
- groupMap.put(response.getChosen(), response.getResponseType());
+ responses.put(response.getGroupId(), response);
+ groupMap.put(response.getChosenClusterName(), response.getGroupId());
sendCondition.signal();
}
finally
@@ -119,8 +119,8 @@
public Response receive(Proposal proposal, int distance) throws Exception
{
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -133,6 +133,11 @@
{
}
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+
+ }
+
public void onNotification(Notification notification)
{
if(notification.getType() == NotificationType.BINDING_REMOVED)
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -21,23 +21,23 @@
{
private final boolean accepted;
- private final SimpleString original;
+ private final SimpleString clusterName;
- private final SimpleString alternative;
+ private final SimpleString alternativeClusterName;
- private SimpleString responseType;
+ private SimpleString groupId;
- public Response(SimpleString responseType, SimpleString original)
+ public Response(SimpleString groupId, SimpleString clusterName)
{
- this(responseType, original, null);
+ this(groupId, clusterName, null);
}
- public Response(SimpleString responseType, SimpleString original, SimpleString alternative)
+ public Response(SimpleString groupId, SimpleString clusterName, SimpleString alternativeClusterName)
{
- this.responseType = responseType;
- this.accepted = alternative == null;
- this.original = original;
- this.alternative = alternative;
+ this.groupId = groupId;
+ this.accepted = alternativeClusterName == null;
+ this.clusterName = clusterName;
+ this.alternativeClusterName = alternativeClusterName;
}
public boolean isAccepted()
@@ -45,29 +45,29 @@
return accepted;
}
- public SimpleString getOriginal()
+ public SimpleString getClusterName()
{
- return original;
+ return clusterName;
}
- public SimpleString getAlternative()
+ public SimpleString getAlternativeClusterName()
{
- return alternative;
+ return alternativeClusterName;
}
- public SimpleString getChosen()
+ public SimpleString getChosenClusterName()
{
- return alternative != null?alternative:original;
+ return alternativeClusterName != null? alternativeClusterName : clusterName;
}
@Override
public String toString()
{
- return "accepted = " + accepted + " original = " + original + " alternative = " + alternative;
+ return "accepted = " + accepted + " clusterName = " + clusterName + " alternativeClusterName = " + alternativeClusterName;
}
- public SimpleString getResponseType()
+ public SimpleString getGroupId()
{
- return responseType;
+ return groupId;
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -57,6 +57,7 @@
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -84,6 +85,8 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.*;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -194,6 +197,8 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
+ private GroupingHandler groupingHandler;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -309,6 +314,11 @@
clusterManager.stop();
}
+ if(groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
// Need to flush all sessions to make sure all confirmations get sent back to client
for (ServerSession session : sessions.values())
@@ -864,6 +874,17 @@
return executorFactory;
}
+ public void setGroupingHandler(GroupingHandler groupingHandler)
+ {
+ this.groupingHandler = groupingHandler;
+ // managementService.addNotificationListener(groupingHandler);
+ }
+
+ public GroupingHandler getGroupingHandler()
+ {
+ return groupingHandler;
+ }
+
// Public
// ---------------------------------------------------------------------------------------
@@ -1114,6 +1135,8 @@
clusterManager.start();
}
+ deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
+
if (deploymentManager != null)
{
deploymentManager.start();
@@ -1154,8 +1177,10 @@
{
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
- storageManager.loadBindingJournal(queueBindingInfos);
+ List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
+ storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+
// Set the node id - must be before we load the queues into the postoffice, but after we load the journal
setNodeID();
@@ -1187,6 +1212,14 @@
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
}
+ for (GroupingInfo groupingInfo : groupingInfos)
+ {
+ if(groupingHandler != null)
+ {
+ groupingHandler.addGroupBinding(new GroupBinding(groupingInfo.getId(), groupingInfo.getGroupId(), groupingInfo.getClusterName()));
+ }
+ }
+
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
@@ -1348,6 +1381,25 @@
}
}
+ private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
+ {
+ if (config != null)
+ {
+ GroupingHandler groupingHandler;
+ if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+ {
+ groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), getStorageManager());
+ }
+ else
+ {
+ groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
+ }
+ log.info("deploying grouping handler: " + groupingHandler);
+ this.groupingHandler = groupingHandler;
+ managementService.addNotificationListener(groupingHandler);
+ }
+ }
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -45,8 +45,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.group.impl.LocalGroupingHandler;
-import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
@@ -60,25 +58,23 @@
* A ClusterTestBase
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 30 Jan 2009 11:29:43
- *
- *
+ * <p/>
+ * Created 30 Jan 2009 11:29:43
*/
public class ClusterTestBase extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
private static final int[] PORTS = {TransportConstants.DEFAULT_PORT,
- TransportConstants.DEFAULT_PORT + 1,
- TransportConstants.DEFAULT_PORT + 2,
- TransportConstants.DEFAULT_PORT + 3,
- TransportConstants.DEFAULT_PORT + 4,
- TransportConstants.DEFAULT_PORT + 5,
- TransportConstants.DEFAULT_PORT + 6,
- TransportConstants.DEFAULT_PORT + 7,
- TransportConstants.DEFAULT_PORT + 8,
- TransportConstants.DEFAULT_PORT + 9,
+ TransportConstants.DEFAULT_PORT + 1,
+ TransportConstants.DEFAULT_PORT + 2,
+ TransportConstants.DEFAULT_PORT + 3,
+ TransportConstants.DEFAULT_PORT + 4,
+ TransportConstants.DEFAULT_PORT + 5,
+ TransportConstants.DEFAULT_PORT + 6,
+ TransportConstants.DEFAULT_PORT + 7,
+ TransportConstants.DEFAULT_PORT + 8,
+ TransportConstants.DEFAULT_PORT + 9,
};
private static final long WAIT_TIMEOUT = 10000;
@@ -89,21 +85,21 @@
super.setUp();
checkFreePort(PORTS);
-
+
clearData();
}
-
+
@Override
protected void tearDown() throws Exception
{
checkFreePort(PORTS);
-
+
servers = null;
sfs = null;
-
+
consumers = null;
-
+
consumers = new ConsumerHolder[MAX_CONSUMERS];
super.tearDown();
@@ -173,8 +169,8 @@
//System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
- ", expecting = " +
- count);
+ ", expecting = " +
+ count);
}
protected void waitForBindings(int node,
@@ -219,7 +215,7 @@
{
if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
{
- QueueBinding qBinding = (QueueBinding)binding;
+ QueueBinding qBinding = (QueueBinding) binding;
bindingCount++;
@@ -242,8 +238,8 @@
// System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
- ", totConsumers = " +
- totConsumers;
+ ", totConsumers = " +
+ totConsumers;
log.error(msg);
@@ -424,7 +420,7 @@
{
sendInRange(node, address, 0, numMessages, durable, key, val);
}
-
+
protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean durable, SimpleString key, SimpleString val) throws Exception
{
ClientSessionFactory sf = this.sfs[node];
@@ -450,28 +446,20 @@
session.close();
}
- protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
{
setUpGroupHandler(type, node, 5000);
}
- protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
{
- GroupingHandler groupingHandler;
- if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
- {
- groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
- }
- else
- {
- groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), timeout);
- }
- this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+ this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
+ new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"), type, new SimpleString("queues"), timeout));
}
- protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
+ protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
{
- this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+ this.servers[node].setGroupingHandler(groupingHandler);
}
protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
@@ -490,25 +478,25 @@
}
protected void verifyReceiveAllWithGroupIDRoundRobin(
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
- verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
+ verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
}
protected int verifyReceiveAllOnSingleConsumer(int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
- return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
+ return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
}
protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
- long firstReceiveTime,
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ long firstReceiveTime,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
for (int i = 0; i < consumerIDs.length; i++)
@@ -544,8 +532,8 @@
}
SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
- System.out.println("received " + id + " on consumer " + i);
- if(groupIdsReceived.get(id) == null)
+ System.out.println("received " + id + " on consumer " + i);
+ if (groupIdsReceived.get(id) == null)
{
groupIdsReceived.put(id, i);
}
@@ -562,9 +550,9 @@
}
protected int verifyReceiveAllOnSingleConsumer(boolean ack,
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
int groupIdsReceived = -1;
for (int i = 0; i < consumerIDs.length; i++)
@@ -628,7 +616,7 @@
assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
}
-
+
if (ack)
{
message.acknowledge();
@@ -639,7 +627,7 @@
assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
}
- if (j != (Integer)(message.getProperty(COUNT_PROP)))
+ if (j != (Integer) (message.getProperty(COUNT_PROP)))
{
outOfOrder = true;
System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
@@ -697,8 +685,8 @@
if (message != null)
{
log.info("check receive Consumer " + consumerIDs[i] +
- " received message " +
- message.getProperty(COUNT_PROP));
+ " received message " +
+ message.getProperty(COUNT_PROP));
}
else
{
@@ -769,7 +757,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
Integer prevCount = countMap.get(i);
@@ -793,7 +781,7 @@
}
else
{
- // log.info("consumer " + consumerIDs[i] +" returns null");
+ // log.info("consumer " + consumerIDs[i] +" returns null");
}
}
while (message != null);
@@ -831,7 +819,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
// log.info("consumer " + consumerIDs[i] + " received message " + count);
@@ -879,7 +867,7 @@
assertNotNull(list);
- int elem = (Integer)list.poll();
+ int elem = (Integer) list.poll();
assertEquals(messageCounts[i], elem);
@@ -919,7 +907,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
ints.add(count);
}
@@ -1015,7 +1003,7 @@
}
ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
-
+
sf.setFailoverOnServerShutdown(false);
sf.setRetryInterval(100);
sf.setRetryIntervalMultiplier(1d);
@@ -1127,6 +1115,78 @@
servers[node] = server;
}
+ protected void setupSharedStorageServer(int node, boolean fileStorage, boolean netty, int backupNode)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setJournalDirectory(getJournalDir(backupNode, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setPagingDirectory(getPageDir(backupNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+ configuration.setSharedStore(true);
+
+ if (backupNode != -1)
+ {
+ Map<String, Object> backupParams = generateParams(backupNode, netty);
+
+ if (netty)
+ {
+ TransportConfiguration nettyBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(nettyBackuptc.getName(), nettyBackuptc);
+
+ configuration.setBackupConnectorName(nettyBackuptc.getName());
+ }
+ else
+ {
+ TransportConfiguration invmBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(invmBackuptc.getName(), invmBackuptc);
+
+ configuration.setBackupConnectorName(invmBackuptc.getName());
+ }
+ }
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ server = HornetQ.newHornetQServer(configuration);
+ }
+ else
+ {
+ server = HornetQ.newHornetQServer(configuration, false);
+ }
+ servers[node] = server;
+ }
+
protected void setupServerWithDiscovery(int node,
String groupAddress,
int port,
@@ -1223,21 +1283,21 @@
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
- nettyBackuptc == null ? null : nettyBackuptc.getName()));
+ nettyBackuptc == null ? null : nettyBackuptc.getName()));
}
else
{
connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
- : invmBackuptc.getName()));
+ : invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 250,
- connectorPairs);
+ null,
+ -1,
+ groupAddress,
+ port,
+ 250,
+ connectorPairs);
configuration.getBroadcastGroupConfigurations().add(bcConfig);
@@ -1266,7 +1326,7 @@
if (netty)
{
params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+ org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
}
return params;
@@ -1333,12 +1393,12 @@
pairs.add(connectorPair);
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1384,12 +1444,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1454,12 +1514,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1480,12 +1540,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- discoveryGroupName);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
clusterConfs.add(clusterConf);
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -16,6 +16,7 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.Response;
import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.Notification;
@@ -42,13 +43,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -98,6 +100,10 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
startServers(0, 1, 2);
try
@@ -106,7 +112,7 @@
{
public SimpleString getName()
{
- return null;
+ return null;
}
public Response propose(Proposal proposal) throws Exception
@@ -116,12 +122,12 @@
public void proposed(Response response) throws Exception
{
-
+ System.out.println("ClusteredGroupingTest.proposed");
}
public void send(Response response, int distance) throws Exception
{
-
+ System.out.println("ClusteredGroupingTest.send");
}
public Response receive(Proposal proposal, int distance) throws Exception
@@ -131,11 +137,14 @@
public void onNotification(Notification notification)
{
+ System.out.println("ClusteredGroupingTest.onNotification");
+ }
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ System.out.println("ClusteredGroupingTest.addGroupBinding");
}
}, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -192,14 +201,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -251,14 +261,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -313,14 +324,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -373,14 +385,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -436,14 +449,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -496,14 +510,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -547,11 +562,11 @@
waitForBindings(2, "queues.testaddress", 1, 1, false);
sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+ verifyReceiveAllInRange(30, 40, 3);
sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+ verifyReceiveAllInRange(40, 50, 3);
sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(30, 50, 3);
+ verifyReceiveAllInRange(50, 60, 3);
System.out.println("*****************************************************************************");
}
finally
@@ -576,13 +591,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -642,13 +658,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -677,8 +694,6 @@
sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
stopServers(1);
startServers(1);
@@ -688,7 +703,7 @@
waitForBindings(1, "queues.testaddress", 1, 1, true);
- verifyReceiveAllInRange(10, 30, 1);
+ verifyReceiveAllInRange(10, 20, 1);
System.out.println("*****************************************************************************");
@@ -716,13 +731,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -788,13 +804,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -817,18 +835,19 @@
sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAllInRange(0, 10, 0);
-
+ closeSessionFactory(0);
stopServers(0);
sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
startServers(0);
-
waitForBindings(0, "queues.testaddress", 1, 0, true);
+ setupSessionFactory(0, isNetty());
+ verifyReceiveAllInRange(10, 20, 0);
sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(10, 20, 0);
verifyReceiveAllInRange(20, 30, 0);
System.out.println("*****************************************************************************");
@@ -854,14 +873,14 @@
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -919,14 +938,14 @@
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -951,9 +970,9 @@
CountDownLatch latch = new CountDownLatch(1);
Thread[] threads = new Thread[9];
int range = 0;
- for(int i = 0 ; i < 9; i++,range+=10)
+ for (int i = 0; i < 9; i++, range += 10)
{
- threads[i] = new Thread(new ThreadSender(range, range+10, 1, new SimpleString("id" + i), latch, i < 8));
+ threads[i] = new Thread(new ThreadSender(range, range + 10, 1, new SimpleString("id" + i), latch, i < 8));
}
for (Thread thread : threads)
{
@@ -975,6 +994,61 @@
}
+ public void testGroupingLocalHandlerFails() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupSharedStorageServer(2, isFileStorage(), isNetty(), 0);
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+
+ setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+ closeSessionFactory(0);
+ stopServers(0);
+ Thread.sleep(5000);
+ //setupSessionFactory(3, isNetty());
+ sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
public boolean isNetty()
{
return true;
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
@@ -70,7 +71,7 @@
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
Map<Long, Queue> queues = new HashMap<Long, Queue>();
@@ -88,7 +89,7 @@
queueBindingInfos = new ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
journal.start();
}
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -43,6 +43,7 @@
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -50,6 +51,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -842,7 +844,15 @@
{
}
- /* (non-Javadoc)
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#addQueueBinding(org.hornetq.core.postoffice.Binding)
*/
public void addQueueBinding(final Binding binding) throws Exception
@@ -933,13 +943,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
- */
+ * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
+ */
public void loadMessageJournal(PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
@@ -86,7 +87,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
@@ -108,7 +109,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
@@ -135,7 +136,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -150,13 +150,4 @@
{
}
-
- public void setGroupingHandler(GroupingHandler groupingHandler)
- {
- }
-
- public GroupingHandler getGroupingHandler()
- {
- return null;
- }
}
\ No newline at end of file
14 years, 6 months