[jboss-cvs] JBoss Messaging SVN: r1820 - trunk/tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 19 01:23:06 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-19 01:23:05 -0500 (Tue, 19 Dec 2006)
New Revision: 1820
Modified:
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
adding testTopicSubscriber
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-19 05:54:19 UTC (rev 1819)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-19 06:23:05 UTC (rev 1820)
@@ -31,14 +31,23 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Destination;
+import javax.jms.Topic;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.message.MessageProxy;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -194,6 +203,100 @@
}
+ public void testTopicSubscriber() throws Exception
+ {
+ log.info("++testTopicSubscriber");
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination) topic[1];
+
+
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+ Connection conn1 = cf.createConnection();
+ Connection conn2 = cf.createConnection();
+ Connection conn3 = cf.createConnection();
+
+
+ this.checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
+ JBossConnection conn = (JBossConnection) getConnection(new Connection[]{conn1, conn2, conn3}, 1);
+
+ conn.setClientID("testClient");
+ conn.start();
+
+ JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+ SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+ MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+ JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ receiveMessage("consumerHA", consumerHA, true, false);
+
+ session.commit();
+ //if (true) return;
+
+ Object txID = sessionState.getCurrentTxId();
+
+ producer.send(session.createTextMessage("Hello again before failover"));
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+ ServerManagement.kill(1);
+
+ Thread.sleep(30000);
+ assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+
+ message = session.createTextMessage("Hello After");
+ log.info(">>Sending new message");
+ producer.send(message);
+
+ assertEquals(txID, sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ connSecondServer.start();
+ JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+ session.commit();
+
+ /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, true);
+
+
+ session.commit();
+ conn1.close();
+ conn2.close();
+ conn3.close();
+
+ }
+
+
/*
* Test that the failover mapping is created correctly and updated properly when nodes leave
* or join
@@ -1190,30 +1293,30 @@
}
// Private -------------------------------------------------------
-
-// private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
-// {
-// MessageProxy message = (MessageProxy) consumer.receive(3000);
-// TextMessage txtMessage = (TextMessage) message;
-// if (message != null)
-// {
-// log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
-// } else
-// {
-// log.info(text + ": Message received was null");
-// }
-// if (shouldAssert)
-// {
-// if (shouldBeNull)
-// {
-// assertNull(message);
-// } else
-// {
-// assertNotNull(message);
-// }
-// }
-// }
-
+
+ private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+ {
+ MessageProxy message = (MessageProxy) consumer.receive(3000);
+ TextMessage txtMessage = (TextMessage) message;
+ if (message != null)
+ {
+ log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+ } else
+ {
+ log.info(text + ": Message received was null");
+ }
+ if (shouldAssert)
+ {
+ if (shouldBeNull)
+ {
+ assertNull(message);
+ } else
+ {
+ assertNotNull(message);
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list