JBoss hornetq SVN: r9530 - trunk/tests/src/org/hornetq/tests/integration/ra.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-12 09:54:32 -0400 (Thu, 12 Aug 2010)
New Revision: 9530
Modified:
trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
Log:
Resource Adapter test
* make sure to stop the RA in teardown()
Modified: trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-11 14:43:33 UTC (rev 9529)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-12 13:54:32 UTC (rev 9530)
@@ -12,27 +12,42 @@
*/
package org.hornetq.tests.integration.ra;
-import org.hornetq.api.core.client.HornetQClient;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAQueueConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.security.Role;
import org.hornetq.core.transaction.impl.XidImpl;
-import org.hornetq.ra.*;
+import org.hornetq.ra.HornetQRAConnectionFactory;
+import org.hornetq.ra.HornetQRAConnectionFactoryImpl;
+import org.hornetq.ra.HornetQRAConnectionManager;
+import org.hornetq.ra.HornetQRAManagedConnectionFactory;
+import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.utils.UUIDGenerator;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.HashSet;
-import java.util.Set;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 7, 2010
*/
public class OutgoingConnectionTest extends HornetQRATestBase
{
+ private HornetQResourceAdapter resourceAdapter;
+
@Override
public boolean isSecure()
{
@@ -53,16 +68,25 @@
roles.add(role);
server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
}
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (resourceAdapter != null)
+ {
+ resourceAdapter.stop();
+ }
+ }
public void testSimpleMessageSendAndReceive() throws Exception
{
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -81,13 +105,13 @@
public void testSimpleMessageSendAndReceiveXA() throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
XAQueueConnection queueConnection = qraConnectionFactory.createXAQueueConnection();
XASession s = queueConnection.createXASession();
@@ -111,18 +135,20 @@
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
s.close();
+
+ resourceAdapter.stop();
}
public void testSimpleMessageSendAndReceiveTransacted() throws Exception
{
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
- qResourceAdapter.setUseLocalTx(true);
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter.setUseLocalTx(true);
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -146,13 +172,13 @@
public void testMultipleSessionsThrowsException() throws Exception
{
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -170,13 +196,13 @@
public void testConnectionCredentials() throws Exception
{
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
@@ -187,13 +213,13 @@
public void testConnectionCredentialsFail() throws Exception
{
- HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
- qResourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ resourceAdapter = new HornetQResourceAdapter();
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
- qResourceAdapter.start(ctx);
+ resourceAdapter.start(ctx);
HornetQRAConnectionManager qraConnectionManager = new HornetQRAConnectionManager();
HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
- mcf.setResourceAdapter(qResourceAdapter);
+ mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
15 years, 5 months
JBoss hornetq SVN: r9529 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-11 10:43:33 -0400 (Wed, 11 Aug 2010)
New Revision: 9529
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/AIOImportExportTest.java
Log:
Adding test
Added: trunk/tests/src/org/hornetq/tests/integration/journal/AIOImportExportTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOImportExportTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOImportExportTest.java 2010-08-11 14:43:33 UTC (rev 9529)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A AIOImportExportTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AIOImportExportTest extends NIOImportExportTest
+{
+ public static TestSuite suite()
+ {
+ // Ignore tests if AIO is not installed
+ return UnitTestCase.createAIOTestSuite(AIOImportExportTest.class);
+ }
+
+
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir());
+ }
+
+}
15 years, 5 months
JBoss hornetq SVN: r9528 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 3 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-11 08:27:29 -0400 (Wed, 11 Aug 2010)
New Revision: 9528
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
removed dead code
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -44,7 +44,6 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -439,11 +438,6 @@
try
{
sf = createSessionFactory(connector);
- if (sf != null)
- {
- ClientSessionFactoryInternal internalSF = (ClientSessionFactoryInternal)sf;
- internalSF.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeID, backup, transportConfig));
- }
}
catch (HornetQException e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -30,7 +30,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -138,22 +137,6 @@
}
});
}
- else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
- {
- NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
- TransportConfiguration connector = msg.getConnector();
- boolean backup = msg.isBackup();
- Pair<TransportConfiguration, TransportConfiguration> pair;
- if (backup)
- {
- pair = new Pair<TransportConfiguration, TransportConfiguration>(null, connector);
- }
- else
- {
- pair = new Pair<TransportConfiguration, TransportConfiguration>(connector, null);
- }
- server.getClusterManager().announceNode(msg.getNodeID(), pair);
- }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -21,7 +21,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -91,7 +90,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -497,11 +495,6 @@
packet = new ClusterTopologyChangeMessage();
break;
}
- case NODE_ANNOUNCE:
- {
- packet = new NodeAnnounceMessage();
- break;
- }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -186,8 +186,6 @@
public static final byte CLUSTER_TOPOLOGY = 110;
- public static final byte NODE_ANNOUNCE = 111;
-
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -1,103 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class NodeAnnounceMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
-
- // Attributes ----------------------------------------------------
-
- private String nodeID;
-
- private boolean backup;
-
- private TransportConfiguration connector;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
- {
- super(PacketImpl.NODE_ANNOUNCE);
-
- this.nodeID = nodeID;
-
- this.backup = backup;
-
- this.connector = tc;
- }
-
- public NodeAnnounceMessage()
- {
- super(PacketImpl.NODE_ANNOUNCE);
- }
-
- // Public --------------------------------------------------------
-
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public TransportConfiguration getConnector()
- {
- return connector;
- }
-
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeString(nodeID);
- buffer.writeBoolean(backup);
- connector.encode(buffer);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- this.nodeID = buffer.readString();
- this.backup = buffer.readBoolean();
- connector = new TransportConfiguration();
- connector.decode(buffer);
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -49,6 +49,4 @@
void notifyClientsNodeDown(String nodeID);
void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b);
-
- void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 12:27:29 UTC (rev 9528)
@@ -234,18 +234,6 @@
}
}
- public void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair)
- {
- /*
- System.out.println("node " + server.getNodeID() + " announces " + nodeID + " to its cluster connections " + clusterConnections.keySet());
- for (ClusterConnection clusterConnection : clusterConnections.values())
- {
- clusterConnection.announce(nodeID, pair, false);
- }
- */
-
- }
-
public boolean isStarted()
{
return started;
15 years, 5 months
JBoss hornetq SVN: r9527 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 3 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-11 05:15:56 -0400 (Wed, 11 Aug 2010)
New Revision: 9527
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring
* various fixes
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -44,6 +44,7 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -426,7 +427,7 @@
initialise();
}
- public void connect()
+ public void connect(boolean backup, TransportConfiguration transportConfig)
{
if (initialConnectors != null)
{
@@ -438,6 +439,11 @@
try
{
sf = createSessionFactory(connector);
+ if (sf != null)
+ {
+ ClientSessionFactoryInternal internalSF = (ClientSessionFactoryInternal)sf;
+ internalSF.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeID, backup, transportConfig));
+ }
}
catch (HornetQException e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -35,9 +35,9 @@
TransportConfiguration getBackup( TransportConfiguration live);
void setNodeID(String nodeID);
-
- void connect();
+ void connect(boolean backup, TransportConfiguration transportConfig);
+
void addClusterTopologyListener(ClusterTopologyListener listener);
void removeClusterTopologyListener(ClusterTopologyListener listener);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -118,6 +119,11 @@
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
+
+ public String toString()
+ {
+ return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
+ };
};
final boolean isCC = msg.isClusterConnection();
@@ -132,6 +138,22 @@
}
});
}
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ {
+ NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+ TransportConfiguration connector = msg.getConnector();
+ boolean backup = msg.isBackup();
+ Pair<TransportConfiguration, TransportConfiguration> pair;
+ if (backup)
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null, connector);
+ }
+ else
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(connector, null);
+ }
+ server.getClusterManager().announceNode(msg.getNodeID(), pair);
+ }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -21,6 +21,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case NODE_ANNOUNCE:
+ {
+ packet = new NodeAnnounceMessage();
+ break;
+ }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -185,7 +185,9 @@
// HA
public static final byte CLUSTER_TOPOLOGY = 110;
-
+
+ public static final byte NODE_ANNOUNCE = 111;
+
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private String nodeID;
+
+ private boolean backup;
+
+ private TransportConfiguration connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+
+ this.nodeID = nodeID;
+
+ this.backup = backup;
+
+ this.connector = tc;
+ }
+
+ public NodeAnnounceMessage()
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ buffer.writeBoolean(backup);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ this.nodeID = buffer.readString();
+ this.backup = buffer.readBoolean();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -29,7 +30,7 @@
*
*
*/
-public interface ClusterConnection extends HornetQComponent
+public interface ClusterConnection extends HornetQComponent, ClusterTopologyListener
{
SimpleString getName();
@@ -56,4 +57,6 @@
// for debug
String description();
+
+ void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -16,7 +16,9 @@
import java.util.Map;
import java.util.Set;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -43,4 +45,10 @@
void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
void activate();
+
+ void notifyClientsNodeDown(String nodeID);
+
+ void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b);
+
+ void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
@@ -65,7 +66,7 @@
// Attributes ----------------------------------------------------
- protected final ServerLocator serverLocator;
+ protected final ServerLocatorInternal serverLocator;
private final UUID nodeUUID;
@@ -109,7 +110,7 @@
// Public --------------------------------------------------------
- public BridgeImpl(final ServerLocator serverLocator,
+ public BridgeImpl(final ServerLocatorInternal serverLocator,
final UUID nodeUUID,
final SimpleString name,
final Queue queue,
@@ -589,6 +590,8 @@
{
queue.deliverAsync();
}
+
+ log.info("stopped bridge " + name);
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -63,8 +64,11 @@
private final TransportConfiguration connector;
- public ClusterConnectionBridge(final ServerLocator serverLocator,
+ private final String targetNodeID;
+
+ public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
final UUID nodeUUID,
+ final String targetNodeID,
final SimpleString name,
final Queue queue,
final Executor executor,
@@ -99,6 +103,7 @@
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+ this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
@@ -224,4 +229,20 @@
return serverLocator.createSessionFactory(connector);
}
+ @Override
+ public void connectionFailed(HornetQException me)
+ {
+ try
+ {
+ session.cleanUp();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ serverLocator.notifyNodeDown(targetNodeID);
+ super.connectionFailed(me);
+ }
+
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -26,7 +26,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -58,7 +57,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection, ClusterTopologyListener
+public class ClusterConnectionImpl implements ClusterConnection
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -129,7 +128,10 @@
this.serverLocator = serverLocator;
- this.serverLocator.setClusterConnection(true);
+ if (this.serverLocator != null)
+ {
+ this.serverLocator.setClusterConnection(true);
+ }
this.connector = connector;
@@ -167,17 +169,20 @@
return;
}
- serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
-
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
+ if (serverLocator != null)
{
- public void run()
+ serverLocator.addClusterTopologyListener(this);
+ serverLocator.start();
+
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
{
- serverLocator.connect();
- }
- });
+ public void run()
+ {
+ serverLocator.connect(server.getConfiguration().isBackup(), connector);
+ }
+ });
+ }
started = true;
@@ -198,9 +203,12 @@
{
return;
}
+
+ if (serverLocator != null)
+ {
+ serverLocator.removeClusterTopologyListener(this);
+ }
- serverLocator.removeClusterTopologyListener(this);
-
for (MessageFlowRecord record : records.values())
{
try
@@ -212,6 +220,12 @@
}
}
+ if (serverLocator != null)
+ {
+ serverLocator.close();
+ }
+
+
if (managementService != null)
{
TypedProperties props = new TypedProperties();
@@ -277,6 +291,13 @@
public synchronized void nodeDown(final String nodeID)
{
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ server.getClusterManager().notifyClientsNodeDown(nodeID);
+
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -285,6 +306,7 @@
{
try
{
+ record.reset();
record.close();
}
catch (Exception e)
@@ -298,6 +320,13 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false);
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -339,6 +368,21 @@
log.error("Failed to update topology", e);
}
}
+
+ public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
+ {
+ TransportConfiguration connector = (backup) ? pair.b : pair.a;
+ if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
+ {
+ for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
+ {
+ if (connector.equals(staticConnector))
+ {
+ nodeUP(nodeID, pair, false);
+ }
+ }
+ }
+ }
private void createNewRecord(final String nodeID,
final TransportConfiguration connector,
@@ -350,6 +394,7 @@
Bridge bridge = new ClusterConnectionBridge(serverLocator,
nodeUUID,
+ nodeID,
queueName,
queue,
executorFactory.getExecutor(),
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,7 +30,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -69,8 +68,6 @@
private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
- private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
-
private final ExecutorFactory executorFactory;
private final HornetQServer server;
@@ -91,11 +88,23 @@
private final boolean clustered;
- // FIXME why do we distinguish between client listeners and cluster connection listeners?
- // They are both notified at the same time...
+ // the cluster connections which links this node to other cluster nodes
+ private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
+
+ // regular client listeners to be notified of cluster topology changes.
+ // they correspond to regular clients using a HA ServerLocator
private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
+ // cluster connections listeners to be notified of cluster topology changes
+ // they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -203,6 +212,40 @@
started = false;
}
+ public void notifyClientsNodeDown(String nodeID)
+ {
+ topology.remove(nodeID);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+ }
+
+ public void notifyClientsNodeUp(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
+ {
+ topology.put(nodeID, connectorPair);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last);
+ }
+ }
+
+ public void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair)
+ {
+ /*
+ System.out.println("node " + server.getNodeID() + " announces " + nodeID + " to its cluster connections " + clusterConnections.keySet());
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ clusterConnection.announce(nodeID, pair, false);
+ }
+ */
+
+ }
+
public boolean isStarted()
{
return started;
@@ -231,6 +274,7 @@
public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
+ System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
if (clusterConnection)
{
this.clusterConnectionListeners.add(listener);
@@ -241,10 +285,10 @@
}
// We now need to send the current topology to the client
-
int count = 0;
for (Map.Entry<String, Pair<TransportConfiguration, TransportConfiguration>> entry : topology.entrySet())
{
+ System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " -- " + entry);
listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
}
}
@@ -318,7 +362,7 @@
{
listener.nodeUP(nodeID, pair, false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, pair, false);
@@ -454,7 +498,7 @@
Queue queue = (Queue)binding.getBindable();
- ServerLocator serverLocator;
+ ServerLocatorInternal serverLocator;
if (config.getDiscoveryGroupName() != null)
{
@@ -470,12 +514,12 @@
if (config.isHA())
{
- serverLocator = HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
discoveryGroupConfiguration.getGroupPort());
}
else
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
discoveryGroupConfiguration.getGroupPort());
}
@@ -491,11 +535,11 @@
if (config.isHA())
{
- serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
}
else
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(tcConfigs);
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
}
}
@@ -561,8 +605,9 @@
TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
+ serverLocator.setNodeID(nodeUUID.toString());
}
- else
+ else if (config.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
@@ -574,9 +619,15 @@
}
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
+ serverLocator.setNodeID(nodeUUID.toString());
}
+ else
+ {
+ // no connector or discovery group are defined. The cluster connection will only be a target and will
+ // no connect to other nodes in the cluster
+ serverLocator = null;
+ }
- serverLocator.setNodeID(nodeUUID.toString());
ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
connector,
new SimpleString(config.getName()),
15 years, 5 months
JBoss hornetq SVN: r9526 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-11 05:14:09 -0400 (Wed, 11 Aug 2010)
New Revision: 9526
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
remove sysout call
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-11 09:13:49 UTC (rev 9525)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
@@ -1560,7 +1560,6 @@
remotingService.start();
- System.out.println("remoting service is started");
clusterManager.start();
initialised = true;
15 years, 5 months
JBoss hornetq SVN: r9525 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-11 05:13:49 -0400 (Wed, 11 Aug 2010)
New Revision: 9525
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterWithDiscoveryTest.java
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
HA refactoring
* fix tests setup
* add tests for new HA use cases
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -54,6 +54,7 @@
public void testHeadersRemoved() throws Exception
{
setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+ setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -1251,20 +1251,8 @@
configuration.setBackup(backup);
configuration.getAcceptorConfigurations().clear();
-
- Map<String, Object> params = generateParams(node, netty);
-
- if (netty)
- {
- TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
- else
- {
- TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
- }
-
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
+
HornetQServer server;
if (fileStorage)
@@ -1470,12 +1458,16 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
- TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
- serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+ List<String> pairs = null;
+
+ if (nodeTo != -1)
+ {
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+ pairs = new ArrayList<String>();
+ pairs.add(serverTotc.getName());
+ }
- List<String> pairs = new ArrayList<String>();
- pairs.add(serverTotc.getName());
-
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
name,
@@ -1544,44 +1536,14 @@
throw new IllegalStateException("No server at node " + nodeFrom);
}
- Map<String, TransportConfiguration> connectors = serverFrom.getConfiguration().getConnectorConfigurations();
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
List<String> pairs = new ArrayList<String>();
-
- for (int i = 0; i < nodesTo.length; i++)
+ for (int element : nodesTo)
{
- Map<String, Object> params = generateParams(nodesTo[i], netty);
-
- TransportConfiguration serverTotc;
-
- if (netty)
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
- }
- else
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
- }
-
- connectors.put(serverTotc.getName(), serverTotc);
-
- /*Map<String, Object> backupParams = generateParams(backupsTo[i], netty);
-
- TransportConfiguration serverBackupTotc;
-
- if (netty)
- {
- serverBackupTotc = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, backupParams);
- }
- else
- {
- serverBackupTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, backupParams);
- }
-
- connectors.put(serverBackupTotc.getName(), serverBackupTotc);
-
- Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), serverBackupTotc.getName());*/
-
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
pairs.add(serverTotc.getName());
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -609,9 +609,6 @@
setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
createQueue(0, "queues.testaddress", "queue0", null, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
@@ -621,7 +618,9 @@
//Now bring up node 1
startServers(1);
-
+
+ setupSessionFactory(1, isNetty());
+
createQueue(1, "queues.testaddress", "queue0", null, false);
waitForBindings(1, "queues.testaddress", 1, 0, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -63,6 +63,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -78,6 +79,14 @@
addConsumer(1, 4, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
+
+ Thread.sleep(2000);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
waitForBindings(0, "queues.testaddress", 1, 1, false);
send(0, "queues.testaddress", 10, false, null);
@@ -91,6 +100,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -123,6 +133,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", true, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", true, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", true, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -153,6 +164,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -178,6 +190,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -210,6 +223,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -242,6 +256,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 3, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 3, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 3, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -269,6 +284,7 @@
setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
startServers(0, 1, 2, 3, 4);
@@ -290,12 +306,31 @@
verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
stopServers(2);
+ Thread.sleep(2000);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
startServers(2);
Thread.sleep(2000);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
+
send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -37,6 +37,11 @@
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
+
+ // server #0 is connected to server #1
+ setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+ // server #1 is connected to nobody
+ setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty());
}
@Override
@@ -61,7 +66,6 @@
*/
public void testNeverStartTargetStartSourceThenStopSource() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(0);
// Give it a little time for the bridge to try to start
@@ -72,7 +76,6 @@
public void testStartTargetServerBeforeSourceServer() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -95,7 +98,6 @@
public void testStartSourceServerBeforeTargetServer() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(0, 1);
setupSessionFactory(0, isNetty());
@@ -118,7 +120,6 @@
public void testStopAndStartTarget() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(0, 1);
setupSessionFactory(0, isNetty());
@@ -180,7 +181,6 @@
public void testBasicLocalReceive() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -199,7 +199,6 @@
public void testBasicRoundRobin() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -213,8 +212,14 @@
addConsumer(1, 1, "queue0", null);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+
waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 0, 0, false);
send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1);
@@ -223,7 +228,6 @@
public void testRoundRobinMultipleQueues() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -263,7 +267,6 @@
public void testMultipleNonLoadBalancedQueues() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -305,7 +308,6 @@
public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -369,7 +371,6 @@
public void testMixtureLoadBalancedAndNonLoadBalancedQueuesRemoveSomeQueuesAndConsumers() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -450,7 +451,6 @@
public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnTargetBeforeStartSource() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1);
setupSessionFactory(1, isNetty());
@@ -514,7 +514,6 @@
public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnSourceBeforeStartTarget() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(0);
setupSessionFactory(0, isNetty());
@@ -578,7 +577,6 @@
public void testNotRouteToNonMatchingAddress() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -614,7 +612,6 @@
public void testNonLoadBalancedQueuesWithFilters() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -676,7 +673,6 @@
public void testRoundRobinMultipleQueuesWithFilters() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -741,25 +737,23 @@
public void testRouteWhenNoConsumersFalseNonBalancedQueues() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
-
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue3", null, false);
- createQueue(1, "queues2.testaddress", "queue4", null, false);
- createQueue(1, "queues2.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue3", null, false);
+ createQueue(1, "queues.testaddress", "queue4", null, false);
+ createQueue(1, "queues.testaddress", "queue5", null, false);
- waitForBindings(0, "queues2.testaddress", 3, 0, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 0, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -776,25 +770,28 @@
public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", true, 1, isNetty());
-
+ // server #0 is connected to server #1
+ setupClusterConnection("cluster1", 0, 1, "queues", true, 1, isNetty());
+ // server #1 is connected to nobody
+ setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty());
+
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue3", null, false);
- createQueue(1, "queues2.testaddress", "queue4", null, false);
- createQueue(1, "queues2.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue3", null, false);
+ createQueue(1, "queues.testaddress", "queue4", null, false);
+ createQueue(1, "queues.testaddress", "queue5", null, false);
- waitForBindings(0, "queues2.testaddress", 3, 0, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 0, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -811,25 +808,23 @@
public void testRouteWhenNoConsumersFalseLoadBalancedQueues() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
-
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
- waitForBindings(0, "queues2.testaddress", 3, 0, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 0, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -852,29 +847,27 @@
public void testRouteWhenNoConsumersFalseLoadBalancedQueuesLocalConsumer() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
-
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
addConsumer(2, 0, "queue2", null);
- waitForBindings(0, "queues2.testaddress", 3, 3, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 3, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(3, 1, "queue0", null);
addConsumer(4, 1, "queue1", null);
@@ -889,23 +882,21 @@
public void testRouteWhenNoConsumersFalseLoadBalancedQueuesNoLocalQueue() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
-
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
- waitForBindings(0, "queues2.testaddress", 2, 0, true);
- waitForBindings(0, "queues2.testaddress", 2, 0, false);
+ waitForBindings(0, "queues.testaddress", 2, 0, true);
+ waitForBindings(0, "queues.testaddress", 2, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -921,25 +912,23 @@
public void testRouteWhenNoConsumersTrueLoadBalancedQueues() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", true, 1, isNetty());
-
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
- waitForBindings(0, "queues2.testaddress", 3, 0, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 0, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -958,29 +947,31 @@
public void testRouteWhenNoConsumersTrueLoadBalancedQueuesLocalConsumer() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", true, 1, isNetty());
+ servers[0].getConfiguration().getClusterConfigurations().clear();
+ // server #0 is connected to server #1
+ setupClusterConnection("cluster1", 0, 1, "queues", true, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
addConsumer(2, 0, "queue2", null);
- waitForBindings(0, "queues2.testaddress", 3, 3, true);
- waitForBindings(0, "queues2.testaddress", 3, 0, false);
+ waitForBindings(0, "queues.testaddress", 3, 3, true);
+ waitForBindings(0, "queues.testaddress", 3, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(3, 1, "queue0", null);
addConsumer(4, 1, "queue1", null);
@@ -995,23 +986,25 @@
public void testRouteWhenNoConsumersTrueLoadBalancedQueuesNoLocalQueue() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", true, 1, isNetty());
+ servers[0].getConfiguration().getClusterConfigurations().clear();
+ // server #0 is connected to server #1
+ setupClusterConnection("cluster1", 0, 1, "queues", true, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- createQueue(0, "queues2.testaddress", "queue0", null, false);
- createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
- createQueue(1, "queues2.testaddress", "queue0", null, false);
- createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
- waitForBindings(0, "queues2.testaddress", 2, 0, true);
- waitForBindings(0, "queues2.testaddress", 2, 0, false);
+ waitForBindings(0, "queues.testaddress", 2, 0, true);
+ waitForBindings(0, "queues.testaddress", 2, 0, false);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -1027,7 +1020,6 @@
public void testNonLoadBalancedQueuesWithConsumersWithFilters() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -1085,7 +1077,6 @@
public void testRoundRobinMultipleQueuesWithConsumersWithFilters() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
startServers(1, 0);
setupSessionFactory(0, isNetty());
@@ -1151,7 +1142,6 @@
public void testMultipleClusterConnections() throws Exception
{
- setupClusterConnection("cluster1", 0, 1, "queues1", false, 1, isNetty());
setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
setupClusterConnection("cluster3", 0, 1, "queues3", false, 1, isNetty());
@@ -1162,15 +1152,15 @@
// Make sure the different connections don't conflict
- createQueue(0, "queues1.testaddress", "queue0", null, false);
- createQueue(0, "queues1.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
createQueue(0, "queues2.testaddress", "queue2", null, false);
createQueue(0, "queues2.testaddress", "queue3", null, false);
createQueue(0, "queues3.testaddress", "queue4", null, false);
createQueue(0, "queues3.testaddress", "queue5", null, false);
- createQueue(1, "queues1.testaddress", "queue6", null, false);
- createQueue(1, "queues1.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
createQueue(1, "queues2.testaddress", "queue8", null, false);
createQueue(1, "queues2.testaddress", "queue9", null, false);
createQueue(1, "queues3.testaddress", "queue10", null, false);
@@ -1190,8 +1180,8 @@
addConsumer(10, 1, "queue10", null);
addConsumer(11, 1, "queue11", null);
- waitForBindings(0, "queues1.testaddress", 2, 2, true);
- waitForBindings(0, "queues1.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 2, true);
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
waitForBindings(0, "queues2.testaddress", 2, 2, true);
waitForBindings(0, "queues2.testaddress", 2, 2, false);
@@ -1199,7 +1189,7 @@
waitForBindings(0, "queues3.testaddress", 2, 2, true);
waitForBindings(0, "queues3.testaddress", 2, 2, false);
- send(0, "queues1.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, null);
verifyReceiveAll(10, 0, 1, 6, 7);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -84,6 +84,12 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
@@ -1441,6 +1447,14 @@
stopServers(0, 3);
+ System.out.println("### after servers 0 & 3 stooped ###");
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+ System.out.println("#####################################");
+
startServers(3, 0);
Thread.sleep(3000);
@@ -1489,6 +1503,14 @@
waitForBindings(2, "queues.testaddress", 5, 5, true);
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ System.out.println("### after servers 0 & 3 restarted ###");
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+ System.out.println("#####################################");
waitForBindings(0, "queues.testaddress", 23, 23, false);
waitForBindings(1, "queues.testaddress", 23, 23, false);
@@ -1509,6 +1531,186 @@
verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
}
+ public void testStopSuccessiveServers() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ // stop server 0
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+
+ closeSessionFactory(0);
+
+ stopServers(0);
+
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(1, "queues.testaddress", 18, 18, false);
+ waitForBindings(2, "queues.testaddress", 18, 18, false);
+ waitForBindings(3, "queues.testaddress", 17, 17, false);
+ waitForBindings(4, "queues.testaddress", 16, 16, false);
+
+ // stop server 1
+ removeConsumer(1);
+ removeConsumer(6);
+ removeConsumer(11);
+ removeConsumer(16);
+ removeConsumer(24);
+
+ closeSessionFactory(1);
+
+ stopServers(1);
+
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(2, "queues.testaddress", 13, 13, false);
+ waitForBindings(3, "queues.testaddress", 12, 12, false);
+ waitForBindings(4, "queues.testaddress", 11, 11, false);
+
+ // stop server 2
+ removeConsumer(2);
+ removeConsumer(7);
+ removeConsumer(12);
+ removeConsumer(17);
+ removeConsumer(20);
+
+ closeSessionFactory(2);
+
+ stopServers(2);
+
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(3, "queues.testaddress", 7, 7, false);
+ waitForBindings(4, "queues.testaddress", 6, 6, false);
+
+ // stop server 3
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(3);
+
+ stopServers(3);
+
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(4, "queues.testaddress", 0, 0, false);
+ }
+
protected void setupCluster() throws Exception
{
setupCluster(false);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-11 02:16:35 UTC (rev 9524)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -33,9 +33,20 @@
{
super.setUp();
+ setupServers();
+ setupClusters();
+ }
+
+ protected void setupServers()
+ {
setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ }
- setupServer(1, isFileStorage(), isNetty());
+ protected void setupClusters()
+ {
+ setupClusterConnection("cluster0", 0, 1, "queues", false, 1, isNetty());
+ setupClusterConnection("cluster1", 1, 0, "queues", false, 1, isNetty());
}
@Override
@@ -57,16 +68,85 @@
public void testStartStop() throws Exception
{
- setupClusterConnection("cluster0", 0, 1, "queues", false, 1, isNetty());
- setupClusterConnection("cluster1", 1, 0, "queues", false, 1, isNetty());
-
startServers(0, 1);
- // Give it a little time for the bridge to try to start
- Thread.sleep(2000);
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ createQueue(0, "queues", "queue0", null, false);
+ createQueue(1, "queues", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+ waitForBindings(1, "queues", 1, 1, true);
+
+ waitForBindings(0, "queues", 1, 1, false);
+ waitForBindings(1, "queues", 1, 1, false);
+
+ send(0, "queues", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+
stopServers(0, 1);
}
+ public void testStopStart() throws Exception
+ {
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues", "queue0", null, false);
+ createQueue(1, "queues", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+ waitForBindings(1, "queues", 1, 1, true);
+
+ waitForBindings(0, "queues", 1, 1, false);
+ waitForBindings(1, "queues", 1, 1, false);
+
+ send(0, "queues", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+
+ removeConsumer(1);
+
+ closeSessionFactory(1);
+
+ stopServers(1);
+
+ Thread.sleep(12000);
+
+ System.out.println(clusterDescription(servers[0]));
+
+ startServers(1);
+
+ setupSessionFactory(1, isNetty());
+
+ createQueue(1, "queues", "queue0", null, false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+ waitForBindings(1, "queues", 1, 1, true);
+
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+
+ waitForBindings(1, "queues", 1, 1, false);
+ waitForBindings(0, "queues", 1, 1, false);
+
+ send(0, "queues", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+
+ stopServers(0, 1);
+ }
}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterWithDiscoveryTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterWithDiscoveryTest.java 2010-08-11 09:13:49 UTC (rev 9525)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A TwoWayTwoNodeClusterWithDiscoveryTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class TwoWayTwoNodeClusterWithDiscoveryTest extends TwoWayTwoNodeClusterTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected static final String groupAddress = "230.1.2.3";
+
+ protected static final int groupPort = 6745;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setupClusters()
+ {
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", false, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", false, 1, isNetty());
+ }
+
+ @Override
+ protected void setupServers()
+ {
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 5 months
JBoss hornetq SVN: r9524 - in trunk: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-10 22:16:35 -0400 (Tue, 10 Aug 2010)
New Revision: 9524
Added:
trunk/tests/src/org/hornetq/tests/integration/persistence/ExportFormatTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
Log:
Adding test to validate export/import format between versions
Modified: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-10 21:05:17 UTC (rev 9523)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-11 02:16:35 UTC (rev 9524)
@@ -72,18 +72,32 @@
String journalSuffix,
int minFiles,
int fileSize,
- String fileOutpu) throws Exception
+ String fileOutput) throws Exception
{
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
-
- JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
- FileOutputStream fileOut = new FileOutputStream(new File(fileOutpu));
+ FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
PrintStream out = new PrintStream(buffOut);
+
+ exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+
+ out.close();
+ }
+
+ public static void exportJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize,
+ PrintStream out) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
List<JournalFile> files = journal.orderFiles();
for (JournalFile file : files)
@@ -92,8 +106,6 @@
exportJournalFile(out, nio, file);
}
-
- out.close();
}
/**
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-10 21:05:17 UTC (rev 9523)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-11 02:16:35 UTC (rev 9524)
@@ -16,7 +16,9 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -76,7 +78,29 @@
int fileSize,
String fileInput) throws Exception
{
+ FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+ }
+ public static void importJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize,
+ InputStream stream) throws Exception
+ {
+ Reader reader = new InputStreamReader(stream);
+ importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ }
+
+ public static void importJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize,
+ Reader reader) throws Exception
+ {
+
File journalDir = new File(directory);
journalDir.mkdirs();
@@ -95,19 +119,17 @@
// The journal is empty, as we checked already. Calling load just to initialize the internal data
journal.loadInternalOnly();
- FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+ BufferedReader buffReader = new BufferedReader(reader);
- BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));
-
String line;
HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>();
long lineNumber = 0;
-
+
Map<Long, JournalRecord> journalRecords = journal.getRecords();
- while ((line = reader.readLine()) != null)
+ while ((line = buffReader.readLine()) != null)
{
lineNumber++;
String splitLine[] = line.split(",");
@@ -161,7 +183,7 @@
else if (operation.equals("DeleteRecord"))
{
long id = parseLong("id", lineProperties);
-
+
// If not found it means the append/update records were reclaimed already
if (journalRecords.get((Long)id) != null)
{
@@ -238,7 +260,7 @@
System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
}
}
-
+
journal.stop();
}
Added: trunk/tests/src/org/hornetq/tests/integration/persistence/ExportFormatTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/ExportFormatTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/ExportFormatTest.java 2010-08-11 02:16:35 UTC (rev 9524)
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.io.StringReader;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.journal.impl.ExportJournal;
+import org.hornetq.core.journal.impl.ImportJournal;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ExportFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ExportFormatTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
+ String bindingsFile = "#File,JournalFileImpl: (hornetq-bindings-1.bindings id = 1, recordID = 1)\n" + "operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,data@AAAAAH____8=\n"
+ + "operation@AddRecord,id@0,userRecordType@23,length@16,isUpdate@false,data@jhbeVKTqEd-gYwAi-v8IyA==\n"
+ + "operation@AddRecord,id@4,userRecordType@21,length@17,isUpdate@false,data@AAAABEEAMQAAAAAEQQAxAAA=\n"
+ + "operation@AddRecord,id@109,userRecordType@24,length@8,isUpdate@false,data@AAAAAAAAAG0=\n"
+ + "#File,JournalFileImpl: (hornetq-bindings-2.bindings id = 2, recordID = 2)\n";
+
+ // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
+ String journalFile = "#File,JournalFileImpl: (hornetq-data-1.hq id = 1, recordID = 1)\n"
+ + "operation@AddRecordTX,txID@3,id@6,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rANMEAQAAAAEAAAAGawBlAHkABgAAAAA=\n"
+ + "operation@UpdateTX,txID@3,id@6,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecordTX,txID@3,id@7,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rANcEAQAAAAEAAAAGawBlAHkABgAAAAE=\n"
+ + "operation@UpdateTX,txID@3,id@7,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecordTX,txID@3,id@8,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rANcEAQAAAAEAAAAGawBlAHkABgAAAAI=\n"
+ + "operation@UpdateTX,txID@3,id@8,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecordTX,txID@3,id@9,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAACQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rANcEAQAAAAEAAAAGawBlAHkABgAAAAM=\n"
+ + "operation@UpdateTX,txID@3,id@9,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecordTX,txID@3,id@10,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAACgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rANcEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n"
+ + "operation@UpdateTX,txID@3,id@10,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@Commit,txID@3,numberOfRecords@10\n"
+ + "operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rAOEEAQAAAAEAAAAGawBlAHkABgAAAAU=\n"
+ + "operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rAOQEAQAAAAEAAAAGawBlAHkABgAAAAY=\n"
+ + "operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rAOUEAQAAAAEAAAAGawBlAHkABgAAAAc=\n"
+ + "operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecord,id@17,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAAEQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rAOcEAQAAAAEAAAAGawBlAHkABgAAAAg=\n"
+ + "operation@Update,id@17,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "operation@AddRecord,id@18,userRecordType@31,length@65,isUpdate@false,data@AAAAEQAAAE4AAAAAAAAAEgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABKl7rAOgEAQAAAAEAAAAGawBlAHkABgAAAAk=\n"
+ + "operation@Update,id@18,userRecordType@32,length@8,isUpdate@true,data@AAAAAAAAAAQ=\n"
+ + "#File,JournalFileImpl: (hornetq-data-2.hq id = 2, recordID = 2)\n";
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void _testCreateFormat() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession(false, false, false);
+ session.createQueue("A1", "A1");
+
+ ClientProducer producer = session.createProducer("A1");
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("key", i);
+ producer.send(msg);
+ }
+ session.commit();
+
+ session.close();
+
+ session = factory.createSession(false, true, true);
+
+ producer = session.createProducer("A1");
+
+ for (int i = 5; i < 10; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("key", i);
+ producer.send(msg);
+ }
+
+ server.stop();
+
+ System.out.println("copy & paste the following as bindingsFile:");
+
+ ExportJournal.exportJournal(getBindingsDir(), "hornetq-bindings", "bindings", 2, 1048576, System.out);
+
+ System.out.println("copy & paste the following as dataFile:");
+
+ ExportJournal.exportJournal(getJournalDir(), "hornetq-data", "hq", 2, 102400, System.out);
+ }
+
+ public void testConsumeFromFormat() throws Exception
+ {
+ ImportJournal.importJournal(getJournalDir(), "hornetq-data", "hq", 2, 102400, new StringReader(journalFile));
+ ImportJournal.importJournal(getBindingsDir(),
+ "hornetq-bindings",
+ "bindings",
+ 2,
+ 1048576,
+ new StringReader(bindingsFile));
+
+ HornetQServer server = createServer(true);
+ server.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession();
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer("A1");
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(i, msg.getIntProperty("key").intValue());
+ }
+
+ session.commit();
+
+ server.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 5 months
JBoss hornetq SVN: r9523 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-10 17:05:17 -0400 (Tue, 10 Aug 2010)
New Revision: 9523
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
Fixing test (It was ignoring assertions on consumers, and it was duplicating messages as the rollback was being ignored, since autoSend was = true )
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-08-10 02:27:36 UTC (rev 9522)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-08-10 21:05:17 UTC (rev 9523)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.cluster.reattach;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
@@ -21,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
@@ -303,13 +305,13 @@
producer.send(message);
}
- class MyHandler implements MessageHandler
+ class MyHandler extends AssertionCheckMessageHandler
{
final CountDownLatch latch = new CountDownLatch(1);
volatile int count;
- public void onMessage(final ClientMessage message)
+ public void onMessageAssert(final ClientMessage message)
{
if (count == numMessages)
{
@@ -350,6 +352,8 @@
for (MyHandler handler : handlers)
{
boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
Assert.assertTrue("Didn't receive all messages", ok);
}
@@ -422,13 +426,13 @@
session.start();
}
- class MyHandler implements MessageHandler
+ class MyHandler extends AssertionCheckMessageHandler
{
final CountDownLatch latch = new CountDownLatch(1);
volatile int count;
- public void onMessage(final ClientMessage message)
+ public void onMessageAssert(final ClientMessage message)
{
if (count == numMessages)
{
@@ -460,6 +464,8 @@
for (MyHandler handler : handlers)
{
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
Assert.assertTrue(ok);
}
@@ -494,7 +500,7 @@
final int numMessages = 100;
- final int numSessions = 10;
+ final int numSessions = 1;
Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
Set<ClientSession> sessions = new HashSet<ClientSession>();
@@ -516,7 +522,7 @@
sessions.add(sessConsume);
}
- ClientSession sessSend = sf.createSession(false, true, true);
+ ClientSession sessSend = sf.createSession(false, false, true);
ClientProducer producer = sessSend.createProducer(RandomReattachTest.ADDRESS);
@@ -546,25 +552,36 @@
sessSend.commit();
- class MyHandler implements MessageHandler
+ class MyHandler extends AssertionCheckMessageHandler
{
final CountDownLatch latch = new CountDownLatch(1);
volatile int count;
- public void onMessage(final ClientMessage message)
+ public void onMessageAssert(final ClientMessage message)
{
if (count == numMessages)
{
- Assert.fail("Too many messages");
+ Assert.fail("Too many messages, expected " + count);
}
Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException (e.getMessage(), e);
+ }
if (count == numMessages)
{
+ System.out.println("Latch released");
latch.countDown();
}
}
@@ -586,6 +603,8 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
+
+ handler.checkAssertions();
}
handlers.clear();
@@ -610,6 +629,8 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
+
+ handler.checkAssertions();
}
for (ClientSession session : sessions)
@@ -665,7 +686,7 @@
sessions.add(sessConsume);
}
- ClientSession sessSend = sf.createSession(false, true, true);
+ ClientSession sessSend = sf.createSession(false, false, true);
ClientProducer producer = sessSend.createProducer(RandomReattachTest.ADDRESS);
@@ -700,17 +721,17 @@
session.start();
}
- class MyHandler implements MessageHandler
+ class MyHandler extends AssertionCheckMessageHandler
{
final CountDownLatch latch = new CountDownLatch(1);
volatile int count;
- public void onMessage(final ClientMessage message)
+ public void onMessageAssert(final ClientMessage message)
{
if (count == numMessages)
{
- Assert.fail("Too many messages");
+ Assert.fail("Too many messages, " + count);
}
Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
@@ -740,6 +761,8 @@
boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
+
+ handler.checkAssertions();
}
handlers.clear();
@@ -764,6 +787,8 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
+
+ handler.checkAssertions();
}
for (ClientSession session : sessions)
@@ -1408,7 +1433,7 @@
{
return 2;
}
-
+
@Override
protected void setUp() throws Exception
{
@@ -1494,4 +1519,41 @@
{
abstract void run(final ClientSessionFactory sf) throws Exception;
}
+
+ static abstract class AssertionCheckMessageHandler implements MessageHandler
+ {
+
+
+ public void checkAssertions()
+ {
+ for (AssertionFailedError e: errors)
+ {
+ // it will throw the first error
+ throw e;
+ }
+ }
+
+ private ArrayList<AssertionFailedError> errors = new ArrayList<AssertionFailedError>();
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+ */
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ onMessageAssert(message);
+ }
+ catch (AssertionFailedError e)
+ {
+ e.printStackTrace(); // System.out -> junit reports
+ errors.add(e);
+ }
+ }
+
+ public abstract void onMessageAssert(ClientMessage message);
+
+ }
+
+
}
15 years, 5 months
JBoss hornetq SVN: r9522 - trunk.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 22:27:36 -0400 (Mon, 09 Aug 2010)
New Revision: 9522
Modified:
trunk/build-hornetq.properties
Log:
increase timeout
Modified: trunk/build-hornetq.properties
===================================================================
--- trunk/build-hornetq.properties 2010-08-10 01:11:14 UTC (rev 9521)
+++ trunk/build-hornetq.properties 2010-08-10 02:27:36 UTC (rev 9522)
@@ -28,6 +28,4 @@
# 150 mins
clustering.junit.timeout=9000000
# 90 mins
-clustering.stress.junit.timeout=5400000
-# 90 mins
-stress.junit.timeout=7400000
+stress.junit.timeout=9500000
15 years, 5 months
JBoss hornetq SVN: r9521 - in trunk: tests/src/org/hornetq/tests/stress/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 21:11:14 -0400 (Mon, 09 Aug 2010)
New Revision: 9521
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
HORNETQ-476 moving pending close avoid unecessary locks before close
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-10 01:11:14 UTC (rev 9521)
@@ -1551,6 +1551,8 @@
JournalImpl.trace("Starting compacting operation on journal");
}
JournalImpl.log.debug("Starting compacting operation on journal");
+
+ onCompactStart();
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1571,16 +1573,10 @@
dataFilesToProcess.addAll(dataFiles);
- for (JournalFile file : pendingCloseFiles)
- {
- file.getFile().close();
- }
+ dataFiles.clear();
- dataFilesToProcess.addAll(pendingCloseFiles);
- pendingCloseFiles.clear();
+ drainClosedFiles();
- dataFiles.clear();
-
if (dataFilesToProcess.size() == 0)
{
return;
@@ -2294,7 +2290,7 @@
{
return;
}
-
+
compactingLock.readLock().lock();
try
@@ -2808,10 +2804,15 @@
}
/** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+ protected void onCompactStart() throws Exception
+ {
+ }
+
+ /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
protected void onCompactDone()
{
}
-
+
// Private
// -----------------------------------------------------------------------------
@@ -3314,6 +3315,8 @@
{
try
{
+ drainClosedFiles();
+
if (!checkReclaimStatus())
{
checkCompact();
@@ -3384,33 +3387,13 @@
{
fileFactory.deactivateBuffer();
pendingCloseFiles.add(file);
+ dataFiles.add(file);
Runnable run = new Runnable()
{
public void run()
{
- compactingLock.readLock().lock();
- try
- {
- // The file could be closed by compacting. On this case we need to check if the close still pending
- // before we add it to dataFiles
- if (pendingCloseFiles.remove(file))
- {
- dataFiles.add(file);
- if (file.getFile().isOpen())
- {
- file.getFile().close();
- }
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- finally
- {
- compactingLock.readLock().unlock();
- }
+ drainClosedFiles();
}
};
@@ -3424,7 +3407,24 @@
}
}
+
+ private void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalImpl.log.warn(e.getMessage(), e);
+ }
+ }
+
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-10 01:11:14 UTC (rev 9521)
@@ -16,6 +16,8 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -23,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MoveAction;
+
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOAsyncTask;
@@ -52,9 +56,9 @@
{
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
-
-
+
private static final int MAX_WRITES = 20000;
+
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -74,17 +78,23 @@
false,
JournalCleanupCompactStressTest.class.getClassLoader());
- private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+ private ExecutorService threadPool;
- OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+ private OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+ Executor testExecutor;
+
@Override
public void setUp() throws Exception
{
super.setUp();
+ threadPool = Executors.newFixedThreadPool(20, tFactory);
+ executorFactory = new OrderedExecutorFactory(threadPool);
+ testExecutor = executorFactory.getExecutor();
+
maxRecords = new Semaphore(MAX_WRITES);
-
+
errors.set(0);
File dir = new File(getTemporaryDir());
@@ -111,8 +121,38 @@
factory,
"hornetq-data",
"hq",
- maxAIO);
+ maxAIO)
+ {
+ protected void onCompactStart() throws Exception
+ {
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ System.out.println("OnCompactSTart enter");
+ for (int i = 0; i < 20; i++)
+ {
+ long id = idGen.generateID();
+ journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
+ journal.forceMoveNextFile();
+ journal.appendDeleteRecord(id, id == 20);
+ }
+ System.out.println("OnCompactSTart leave");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+ }
+
+ };
+
journal.start();
journal.loadInternalOnly();
@@ -132,8 +172,10 @@
{
// don't care :-)
}
+
+ threadPool.shutdown();
}
-
+
protected long getTotalTimeMilliseconds()
{
return TimeUnit.MINUTES.toMillis(10);
@@ -185,7 +227,6 @@
// Release Semaphore after setting running to false or the threads may never finish
maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
-
for (Thread t : appenders)
{
t.join();
@@ -198,6 +239,17 @@
t1.join();
+ final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latchExecutorDone.countDown();
+ }
+ });
+
+ latchExecutorDone.await();
+
assertEquals(0, errors.get());
journal.stop();
@@ -247,7 +299,7 @@
LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
+
public FastAppenderTx()
{
super("FastAppenderTX");
@@ -392,12 +444,12 @@
*/
class SlowAppenderNoTX extends Thread
{
-
+
public SlowAppenderNoTX()
{
super("SlowAppender");
}
-
+
@Override
public void run()
{
15 years, 5 months