[jboss-cvs] JBoss Messaging SVN: r1820 - trunk/tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 19 01:23:06 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-19 01:23:05 -0500 (Tue, 19 Dec 2006)
New Revision: 1820

Modified:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
adding testTopicSubscriber

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-19 05:54:19 UTC (rev 1819)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-19 06:23:05 UTC (rev 1820)
@@ -31,14 +31,23 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Destination;
+import javax.jms.Topic;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.message.MessageProxy;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -194,6 +203,100 @@
 
    }
 
+   public void testTopicSubscriber() throws Exception
+   {
+      log.info("++testTopicSubscriber");
+
+      log.info(">>Lookup Queue");
+      Destination destination = (Destination) topic[1];
+
+
+      JBossConnectionFactory factory = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+      Connection conn1 = cf.createConnection();
+      Connection conn2 = cf.createConnection();
+      Connection conn3 = cf.createConnection();
+
+
+      this.checkConnectionsDifferentServers(new Connection[]{conn1, conn2, conn3});
+      JBossConnection conn = (JBossConnection) getConnection(new Connection[]{conn1, conn2, conn3}, 1);
+
+      conn.setClientID("testClient");
+      conn.start();
+
+      JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+      SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+      MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+      JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+      org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+      ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+      log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+      log.info(">>Creating Producer");
+      MessageProducer producer = session.createProducer(destination);
+      log.info(">>creating Message");
+      Message message = session.createTextMessage("Hello Before");
+      log.info(">>sending Message");
+      producer.send(message);
+      session.commit();
+
+      receiveMessage("consumerHA", consumerHA, true, false);
+
+      session.commit();
+      //if (true) return;
+
+      Object txID = sessionState.getCurrentTxId();
+
+      producer.send(session.createTextMessage("Hello again before failover"));
+
+      ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+      JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+
+      ServerManagement.kill(1);
+
+      Thread.sleep(30000);
+      assertNotSame(originalRemoting, delegate.getRemotingConnection());
+
+      //System.out.println("Kill server1"); Thread.sleep(10000);
+
+      message = session.createTextMessage("Hello After");
+      log.info(">>Sending new message");
+      producer.send(message);
+
+      assertEquals(txID, sessionState.getCurrentTxId());
+      System.out.println("TransactionID on client = " + txID);
+      log.info(">>Final commit");
+
+      /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+       connSecondServer.start();
+       JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+       MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+      session.commit();
+
+      /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+     receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+     receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+      log.info("Calling alternate receiver");
+      receiveMessage("consumerHA", consumerHA, true, false);
+      receiveMessage("consumerHA", consumerHA, true, false);
+      receiveMessage("consumerHA", consumerHA, true, true);
+
+
+      session.commit();
+      conn1.close();
+      conn2.close();
+      conn3.close();
+
+   }
+
+
    /*
     * Test that the failover mapping is created correctly and updated properly when nodes leave
     * or join
@@ -1190,30 +1293,30 @@
    }
    
    // Private -------------------------------------------------------
-   
-//   private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
-//   {
-//      MessageProxy message = (MessageProxy) consumer.receive(3000);
-//      TextMessage txtMessage = (TextMessage) message;
-//      if (message != null)
-//      {
-//         log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
-//      } else
-//      {
-//         log.info(text + ": Message received was null");
-//      }
-//      if (shouldAssert)
-//      {
-//         if (shouldBeNull)
-//         {
-//            assertNull(message);
-//         } else
-//         {
-//            assertNotNull(message);
-//         }
-//      }
-//   }
-   
+
+   private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+   {
+      MessageProxy message = (MessageProxy) consumer.receive(3000);
+      TextMessage txtMessage = (TextMessage) message;
+      if (message != null)
+      {
+         log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+      } else
+      {
+         log.info(text + ": Message received was null");
+      }
+      if (shouldAssert)
+      {
+         if (shouldBeNull)
+         {
+            assertNull(message);
+         } else
+         {
+            assertNotNull(message);
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
    
 }




More information about the jboss-cvs-commits mailing list