[jboss-cvs] JBoss Messaging SVN: r1619 - in branches/Branch_Client_Failover_Experiment: src/etc/xmdesc src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 22 18:26:44 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-22 18:26:37 -0500 (Wed, 22 Nov 2006)
New Revision: 1619
Added:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Modified:
branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
Adding new HA automated Tests + LeaveClusterRequest message
Modified: branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-11-22 23:26:37 UTC (rev 1619)
@@ -142,6 +142,16 @@
</operation>
<operation>
+ <description></description>
+ <name>stop</name>
+ <parameter>
+ <description>Should we send a notification about leaving cluster</description>
+ <name>sendNotification</name>
+ <type>boolean</type>
+ </parameter>
+ </operation>
+
+ <operation>
<description>JBoss Service lifecycle operation</description>
<name>destroy</name>
</operation>
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -21,30 +21,21 @@
*/
package org.jboss.messaging.core.plugin.postoffice;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
+import java.util.*;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
-
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.*;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.JDBCSupport;
@@ -54,10 +45,6 @@
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-
/**
*
* A DefaultPostOffice
@@ -143,9 +130,14 @@
if (trace) { log.trace(this + " started"); }
}
-
+
public void stop() throws Exception
{
+ stop(true);
+ }
+
+ public void stop(boolean sendNotification) throws Exception
+ {
if (trace) { log.trace(this + " stopping"); }
super.stop();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -23,7 +23,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-
import org.jboss.messaging.util.Streamable;
/**
@@ -99,6 +98,9 @@
request = new RollbackPullRequest();
break;
}
+ case LeaveClusterRequest.TYPE:
+ request = new LeaveClusterRequest();
+ break;
default:
{
throw new IllegalArgumentException("Invalid type: " + type);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -296,16 +296,17 @@
if (trace) { log.trace("Started " + this + " with sync address " + syncAddress +
" async address " + asyncAddress); }
}
-
- public synchronized void stop() throws Exception
+
+ public synchronized void stop(boolean sendNotification) throws Exception
{
if (!started)
{
log.warn("Attempt to stop() but " + this + " is not started");
}
+
+ syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
+ super.stop(sendNotification);
- super.stop();
-
statsSender.stop();
syncChannel.close();
@@ -1322,7 +1323,7 @@
NodeAddressInfo info[] = getClusterNodes();
- out.println("<table><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
+ out.println("<table border=1><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
for (int i = 0; i < info.length; i++)
{
out.println("<tr><td>" + info[i].getNodeId() + "</td><td>" + info[i].getAsyncChannelAddress() + "</td><td>" + info[i].getSyncChannelAddress() + "</td>");
@@ -1539,9 +1540,12 @@
lock.readLock().release();
}
}
-
- private void removeBindingsForAddress(Integer nodeId) throws Exception
+
+
+ public void removeBindingsForAddress(int parameterNodeId) throws Exception
{
+ log.info("Node " + parameterNodeId + " requested to leave cluster");
+ Integer nodeId = new Integer(parameterNodeId);
lock.writeLock().acquire();
try
@@ -1838,22 +1842,23 @@
try
{
Integer nodeId = getNodeIdForSyncAddress(address);
-
- if (nodeId == null)
+
+ // nodeId==null means the server left the cluster without a problem. It sent a message before
+ // leaving the cluster
+ if (nodeId != null)
{
- throw new IllegalStateException("Cannot find node id for address: " + address);
+ if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
+
+ //Perform a check - the member might have crashed and left uncommitted transactions
+ //we need to resolve this
+ check(nodeId);
+
+ //removeBindingsForAddress(nodeId);
+ failOver(nodeId.intValue());
+
+ if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " cleanup complete"); }
}
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
-
- //Perform a check - the member might have crashed and left uncommitted transactions
- //we need to resolve this
- check(nodeId);
-
- removeBindingsForAddress(nodeId);
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " cleanup complete"); }
- }
+ }
catch (Throwable e)
{
log.error("Caught Exception in MembershipListener", e);
Added: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,51 @@
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class LeaveClusterRequest extends ClusterRequest
+{
+ static final int TYPE = 11;
+
+ private int nodeId;
+
+ public LeaveClusterRequest(int nodeId)
+ {
+ this.nodeId=nodeId;
+ }
+
+ /**
+ * This constructor only exist because it's an Streamable requirement.
+ * @see ClusterRequest#createFromStream(java.io.DataInputStream)
+ */
+ public LeaveClusterRequest()
+ {
+ }
+
+ Object execute(PostOfficeInternal office) throws Throwable
+ {
+ office.removeBindingsForAddress(nodeId);
+ return null;
+ }
+
+ byte getType()
+ {
+ return TYPE;
+ }
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(nodeId);
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ nodeId = in.readInt();
+ }
+}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
-
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
@@ -48,6 +47,8 @@
void removeBindingFromCluster(int nodeId, String queueName)
throws Exception;
+
+ void removeBindingsForAddress(int nodeId) throws Exception;
void handleAddressNodeMapping(NodeAddressInfo info, int nodeId)
throws Exception;
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,196 @@
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class ClusteringBase extends MessagingTestCase
+{
+
+ protected Context ic1;
+
+ protected Context ic2;
+
+ protected Context ic3;
+
+ protected Queue queue1;
+
+ protected Topic topic1;
+
+ protected Queue queue2;
+
+ protected Topic topic2;
+
+ protected Queue queue3;
+
+ protected Topic topic3;
+
+ protected ConnectionFactory cf1;
+
+ protected ConnectionFactory cf2;
+
+ protected ConnectionFactory cf3;
+
+ public ClusteringBase(String name)
+ {
+ super(name);
+ }
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ try
+ {
+
+ ServerManagement.start("all", 0, true);
+
+ ServerManagement.start("all", 1, true);
+
+ ServerManagement.start("all", 2, true);
+
+ ServerManagement.deployClusteredQueue("testDistributedQueue", 0);
+ ServerManagement.deployClusteredTopic("testDistributedTopic", 0);
+
+ ServerManagement.deployClusteredQueue("testDistributedQueue", 1);
+ ServerManagement.deployClusteredTopic("testDistributedTopic", 1);
+
+ ServerManagement.deployClusteredQueue("testDistributedQueue", 2);
+ ServerManagement.deployClusteredTopic("testDistributedTopic", 2);
+
+ ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+
+ ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+ ic3 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
+
+ queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
+
+ queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
+
+ queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
+
+ topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
+
+ topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
+
+ topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
+
+ cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
+
+ cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+
+ drainQueues();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+
+ ServerManagement.undeployQueue("testDistributedQueue", 0);
+ ServerManagement.undeployTopic("testDistributedTopic", 0);
+
+ ServerManagement.undeployQueue("testDistributedQueue", 1);
+ ServerManagement.undeployTopic("testDistributedTopic", 1);
+
+ ServerManagement.undeployQueue("testDistributedQueue", 2);
+ ServerManagement.undeployTopic("testDistributedTopic", 2);
+
+ ic1.close();
+
+ ic2.close();
+
+ ic3.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected void drainQueues() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ conn3 = cf3.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue3);
+
+ conn1.start();
+
+ conn2.start();
+
+ conn3.start();
+
+ Message msg = null;
+
+ do
+ {
+ msg = cons1.receive(1000);
+ }
+ while (msg != null);
+
+ do
+ {
+ msg = cons2.receive(1000);
+ }
+ while (msg != null);
+
+ do
+ {
+ msg = cons3.receive(1000);
+ }
+ while (msg != null);
+ }
+ finally
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+
+ if (conn3 != null) conn3.close();
+ }
+ }
+
+
+}
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -0,0 +1,282 @@
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.*;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.message.MessageProxy;
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class HATest extends ClusteringBase
+{
+ protected Logger log = Logger.getLogger(getClass());
+
+ public HATest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testLeaveFailover() throws Exception
+ {
+ try
+ {
+ System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+ assertEquals(3,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+ assertEquals(3,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+ ServerManagement.stop(0,true);
+ System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+ assertEquals(2,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+ }
+ finally
+ {
+ ServerManagement.start("all", 0, true); // tear down needs the server up
+ }
+ }
+
+
+ public void testTopicSubscriber() throws Exception
+ {
+ try
+ {
+ log.info("++testTopicSubscriber");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+
+ log.info("Creating connection server1");
+ JBossConnection conn = (JBossConnection) cf1.createConnection();
+ conn.setClientID("testClient");
+ conn.start();
+
+ JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+ SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+ MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+ JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ receiveMessage("consumerHA", consumerHA, true, false);
+
+ session.commit();
+ //if (true) return;
+
+ Object txID = sessionState.getCurrentTxId();
+
+ producer.send(session.createTextMessage("Hello again before failover"));
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+ log.info(">>Creating alternate connection");
+ JBossConnection conn2 = (JBossConnection) cf2.createConnection();
+ log.info("NewConnectionCreated=" + conn2);
+
+ log.info(">>Failling over");
+ assertSame(originalRemoting, delegate.getRemotingConnection());
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+
+
+ try
+ {
+ originalRemoting.stop();
+ } catch (Throwable throwable)
+ {
+ throwable.printStackTrace();
+ }
+
+ ServerManagement.stop(0, false);
+
+ assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+
+ message = session.createTextMessage("Hello After");
+ log.info(">>Sending new message");
+ producer.send(message);
+
+ assertEquals(txID, sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ connSecondServer.start();
+ JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+ session.commit();
+
+ /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, true);
+
+
+ session.commit();
+
+ }
+ finally
+ {
+ // restart the server as it was probably stopped (tearDown will need that)
+ ServerManagement.start("all", 0, true);
+ }
+ }
+
+ public void testQueueHA() throws Exception
+ {
+ log.info("++testTopicSubscriber");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination) ic1.lookup("queue/testDistributedQueue");
+
+ log.info("Creating connection server1");
+ JBossConnection conn = (JBossConnection) cf1.createConnection();
+ conn.setClientID("testClient");
+ conn.start();
+
+ JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+ SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+ MessageConsumer consumerHA = session.createConsumer(destination);
+ JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ session.commit();
+ //if (true) return;
+
+ Object txID = sessionState.getCurrentTxId();
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+ log.info(">>Creating alternate connection");
+ JBossConnection conn2 = (JBossConnection) cf2.createConnection();
+ log.info("NewConnectionCreated=" + conn2);
+
+ log.info(">>Failling over");
+ assertSame(originalRemoting, delegate.getRemotingConnection());
+ conn.getDelegate().failOver(conn2.getDelegate());
+
+ try
+ {
+ originalRemoting.stop();
+ } catch (Throwable throwable)
+ {
+ throwable.printStackTrace();
+ }
+
+
+ assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+ assertEquals(txID, sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ session.commit();
+
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, true);
+
+ session.commit();
+
+ for (int i = 0; i < 30; i++)
+ {
+ log.info("Message Sent " + i);
+ producer.send(session.createTextMessage("Message " + i));
+ }
+ session.commit();
+
+ Thread.sleep(5000);
+
+ TextMessage messageLoop = null;
+ while (!((messageLoop = (TextMessage) consumerHA.receive(5000)) == null))
+ {
+ log.info("Message received = " + messageLoop.getText());
+ }
+
+ }
+
+
+ private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+ {
+ MessageProxy message = (MessageProxy) consumer.receive(3000);
+ TextMessage txtMessage = (TextMessage) message;
+ if (message != null)
+ {
+ log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+ } else
+ {
+ log.info(text + ": Message received was null");
+ }
+ if (shouldAssert)
+ {
+ if (shouldBeNull)
+ {
+ assertNull(message);
+ } else
+ {
+ assertNotNull(message);
+ }
+ }
+ }
+
+
+}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -21,23 +21,8 @@
*/
package org.jboss.test.messaging.jms.clustering;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
+import javax.jms.*;
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-
/**
*
* A ManualClusteringTest
@@ -50,32 +35,8 @@
* $Id$
*
*/
-public class ManualClusteringTest extends MessagingTestCase
+public class ManualClusteringTest extends ClusteringBase
{
- protected Context ic1;
-
- protected Context ic2;
-
- protected Context ic3;
-
- protected Queue queue1;
-
- protected Topic topic1;
-
- protected Queue queue2;
-
- protected Topic topic2;
-
- protected Queue queue3;
-
- protected Topic topic3;
-
- protected ConnectionFactory cf1;
-
- protected ConnectionFactory cf2;
-
- protected ConnectionFactory cf3;
-
public ManualClusteringTest(String name)
{
super(name);
@@ -85,151 +46,13 @@
protected void setUp() throws Exception
{
super.setUp();
-
- try
- {
-
- ServerManagement.start("all", 0, true);
-
- ServerManagement.start("all", 1, true);
-
- ServerManagement.start("all", 2, true);
-
- ServerManagement.deployClusteredQueue("testDistributedQueue", 0);
- ServerManagement.deployClusteredTopic("testDistributedTopic", 0);
-
- ServerManagement.deployClusteredQueue("testDistributedQueue", 1);
- ServerManagement.deployClusteredTopic("testDistributedTopic", 1);
-
- ServerManagement.deployClusteredQueue("testDistributedQueue", 2);
- ServerManagement.deployClusteredTopic("testDistributedTopic", 2);
-
- ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
-
- ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
-
- ic3 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
-
- queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
-
- queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
-
- queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
-
- topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
-
- topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
-
- topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
-
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
-
- cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
-
- drainQueues();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw e;
- }
}
protected void tearDown() throws Exception
{
- try
- {
super.tearDown();
-
- ServerManagement.undeployQueue("testDistributedQueue", 0);
- ServerManagement.undeployTopic("testDistributedTopic", 0);
-
- ServerManagement.undeployQueue("testDistributedQueue", 1);
- ServerManagement.undeployTopic("testDistributedTopic", 1);
-
- ServerManagement.undeployQueue("testDistributedQueue", 2);
- ServerManagement.undeployTopic("testDistributedTopic", 2);
-
- ic1.close();
-
- ic2.close();
-
- ic3.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw e;
- }
}
- protected void drainQueues() throws Exception
- {
- Connection conn1 = null;
-
- Connection conn2 = null;
-
- Connection conn3 = null;
-
- try
- {
- conn1 = cf1.createConnection();
-
- conn2 = cf2.createConnection();
-
- conn3 = cf3.createConnection();
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons1 = sess1.createConsumer(queue1);
-
- MessageConsumer cons2 = sess2.createConsumer(queue2);
-
- MessageConsumer cons3 = sess3.createConsumer(queue3);
-
- conn1.start();
-
- conn2.start();
-
- conn3.start();
-
- Message msg = null;
-
- do
- {
- msg = cons1.receive(1000);
- }
- while (msg != null);
-
- do
- {
- msg = cons2.receive(1000);
- }
- while (msg != null);
-
- do
- {
- msg = cons3.receive(1000);
- }
- while (msg != null);
- }
- finally
- {
- if (conn1 != null) conn1.close();
-
- if (conn2 != null) conn2.close();
-
- if (conn3 != null) conn3.close();
- }
- }
-
-
public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
{
clusteredQueueLocalConsumer(false);
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -165,10 +165,17 @@
public static synchronized void stop() throws Exception
{
insureStarted();
-
- servers[0].stop();
+
+ servers[0].stop();
}
+ public static synchronized void stop(int index, boolean sendNotification) throws Exception
+ {
+ insureStarted();
+
+ servers[index].stop(sendNotification);
+ }
+
public static synchronized void destroy() throws Exception
{
stop();
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
@@ -129,12 +130,17 @@
public synchronized void stop() throws Exception
{
+ stop(true);
+ }
+
+ public synchronized void stop(boolean sendNotification) throws Exception
+ {
if (!isStarted())
{
return;
}
- stopServerPeer();
+ stopServerPeer(sendNotification);
log.debug("stopping service container");
@@ -360,6 +366,11 @@
public void stopServerPeer() throws Exception
{
+ stopServerPeer(true);
+ }
+
+ public void stopServerPeer(boolean sendNotification) throws Exception
+ {
try
{
// if we don't find a ServerPeer instance registered under the serverPeerObjectName
@@ -421,7 +432,7 @@
try
{
- sc.invoke(serverPeerObjectName, "stop", new Object[0], new String[0]);
+ sc.invoke(serverPeerObjectName, "stop", new Object[]{new Boolean(sendNotification)}, new String[]{"boolean"});
sc.invoke(serverPeerObjectName, "destroy", new Object[0], new String[0]);
sc.unregisterService(serverPeerObjectName);
}
@@ -566,7 +577,13 @@
return (PostOffice)sc.
getAttribute(queuePostOfficeObjectName, "Instance");
}
-
+
+ public PostOffice internalGetQueuePostOffice() throws Exception
+ {
+ return (PostOffice)sc.
+ getAttribute(queuePostOfficeObjectName, "Instance");
+ }
+
public PostOffice getTopicPostOffice() throws Exception
{
return (PostOffice)sc.
@@ -810,6 +827,15 @@
return sc.getUserTransaction();
}
+
+ public int getNumberOfNodesOnCluster() throws Exception
+ {
+ log.info("getNumberOfNodesOnCluster being called:: sc=" + sc);
+ DefaultClusteredPostOffice postOffice = (DefaultClusteredPostOffice)
+ sc.getAttribute(queuePostOfficeObjectName, "Instance");
+ return postOffice.getClusterNodes().length;
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -330,6 +330,12 @@
server.startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
}
+ public void stop(boolean sendNotification) throws Exception
+ {
+ server.stop(sendNotification);
+ namingDelegate.reset();
+ }
+
public void stop() throws Exception
{
server.stop();
@@ -372,8 +378,15 @@
return server.getUserTransaction();
}
+
+ public int getNumberOfNodesOnCluster() throws Exception
+ {
+ return server.getNumberOfNodesOnCluster();
+ }
+
private RMINamingDelegate getNamingDelegate()
{
return namingDelegate;
}
+
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-11-22 03:46:36 UTC (rev 1618)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-11-22 23:26:37 UTC (rev 1619)
@@ -43,6 +43,7 @@
public interface Server extends Remote
{
void start(String containerConfig, boolean clustered) throws Exception;
+ void stop(boolean sendNotification) throws Exception;
void stop() throws Exception;
void destroy() throws Exception;
@@ -217,4 +218,7 @@
UserTransaction getUserTransaction() throws Exception;
+ /** use only on Clustering tests */
+ int getNumberOfNodesOnCluster() throws Exception;
+
}
More information about the jboss-cvs-commits
mailing list