[jboss-cvs] JBoss Messaging SVN: r3435 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 6 14:00:46 EST 2007


Author: timfox
Date: 2007-12-06 14:00:46 -0500 (Thu, 06 Dec 2007)
New Revision: 3435

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
Durable sub failover with same client id


Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-06 18:32:26 UTC (rev 3434)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-06 19:00:46 UTC (rev 3435)
@@ -1966,9 +1966,11 @@
                if (trace) { log.trace(this + " subscription " + subscriptionName + " already exists"); }
                
             	//Check if it is already has a subscriber
-            	//We can't have more than one subscriber at a time on the durable sub
+            	//We can't have more than one subscriber at a time on the durable sub unless it is clustered
+               //we need this for clustered since otherwise we wouldn't be able to fail over subcribers for the same durable
+               //sub onto a node which already has one
                
-               if (queue.getLocalDistributor().getNumberOfReceivers() > 0)
+               if (queue.getLocalDistributor().getNumberOfReceivers() > 0 && !mDest.isClustered())
                {
                	throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
                }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-12-06 18:32:26 UTC (rev 3434)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-12-06 19:00:46 UTC (rev 3435)
@@ -24,6 +24,7 @@
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.test.messaging.jms.clustering.ClusteringTestBase.SimpleFailoverListener;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
 
@@ -1721,8 +1722,173 @@
          }
       }
    }
+   
+   public void testDurableSubscriptionFailover() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = createConnectionOnServer(cf, 1);
+         
+         conn.setClientID("myclientid1");
+         
+         conn.start();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+         
+         MessageConsumer cons = session.createDurableSubscriber(topic[1], "mysub1");
+         
+         MessageProducer prod = session.createProducer(topic[1]);
+         
+         for (int i = 0; i < 5; i++)
+         {
+            TextMessage tm = session.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+
+         ServerManagement.kill(1);
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         assertEquals(0, getServerId(conn));
+         
+         for (int i = 5; i < 10; i++)
+         {
+            TextMessage tm = session.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(30000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
     
+   public void testDurableSubscriptionFailoverTwosubscribers() throws Exception
+   {
+      Connection conn = null;
+      
+      Connection conn0 = null;
 
+      try
+      {
+         conn = createConnectionOnServer(cf, 1);
+         
+         conn.setClientID("myclientid1");
+         
+         conn.start();
+         
+         conn0 = this.createConnectionOnServer(cf, 0);
+         
+         //same client id
+         conn0.setClientID("myclientid1");
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session session0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+         
+         MessageConsumer cons = session.createDurableSubscriber(topic[1], "mysub1");
+         
+         //Durable sub on different node with same client id and sub name
+         MessageConsumer cons0 = session0.createDurableSubscriber(topic[0], "mysub1");
+         
+         MessageProducer prod = session.createProducer(topic[1]);
+         
+         for (int i = 0; i < 5; i++)
+         {
+            TextMessage tm = session.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+
+         ServerManagement.kill(1);
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         assertEquals(0, getServerId(conn));
+         
+         for (int i = 5; i < 10; i++)
+         {
+            TextMessage tm = session.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(30000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+   }
+    
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------




More information about the jboss-cvs-commits mailing list