[jboss-cvs] JBoss Messaging SVN: r7853 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732: integration/AS5/etc/xmdesc and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 14 12:15:53 EDT 2009


Author: jbertram at redhat.com
Date: 2009-10-14 12:15:53 -0400 (Wed, 14 Oct 2009)
New Revision: 7853

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
Log:
JBPAPP-2917

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml	2009-10-14 16:15:53 UTC (rev 7853)
@@ -315,6 +315,20 @@
           </warning></para>
       </section>
 
+      <section id="conf.serverpeer.attributes.suckerconnectionretryTimes">
+        <title>SuckerConnectionRetryTimes</title>
+
+        <para>Maximum times for a sucker's connection to retry in case of failure. 
+              Default is -1 (retry forever)</para>
+      </section>
+
+      <section id="conf.serverpeer.attributes.suckerconnectionretryinterval">
+        <title>SuckerConnectionRetryInterval</title>
+
+        <para>The interval in milliseconds between each retry of the failed sucker's 
+         connection. Default is 5000.</para>
+      </section>
+
       <section id="conf.serverpeer.attributes.stricttck">
         <title>StrictTCK</title>
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml	2009-10-14 16:15:53 UTC (rev 7853)
@@ -228,8 +228,19 @@
       <name>SuckerPassword</name>
       <type>java.lang.String</type>
    </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+      <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+      <name>SuckerConnectionRetryTimes</name>
+      <type>int</type>
+   </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+      <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+      <name>SuckerConnectionRetryInterval</name>
+      <type>int</type>
+   </attribute>
 
-
    <!-- Managed operations -->
 
    <operation>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml	2009-10-14 16:15:53 UTC (rev 7853)
@@ -94,6 +94,10 @@
       
       <attribute name="EnableMessageCounters">false</attribute>
       
+      <attribute name="SuckerConnectionRetryTimes">-1</attribute>
+      
+      <attribute name="SuckerConnectionRetryInterval">5000</attribute>
+      
       <!-- The password used by the message sucker connections to create connections.
            THIS SHOULD ALWAYS BE CHANGED AT INSTALL TIME TO SECURE SYSTEM
       <attribute name="SuckerPassword"></attribute>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2009-10-14 16:15:53 UTC (rev 7853)
@@ -223,6 +223,18 @@
       <name>SuckerPassword</name>
       <type>java.lang.String</type>
    </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+      <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+      <name>SuckerConnectionRetryTimes</name>
+      <type>int</type>
+   </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+      <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+      <name>SuckerConnectionRetryInterval</name>
+      <type>int</type>
+   </attribute>   
 
 
    <!-- Managed operations -->

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -169,6 +169,10 @@
    // For generating unique Channel ID for cluster without a shared DB
    private long serverStartTime;
 
+   private int suckerConnectionRetryTimes = -1;
+   
+   private int suckerConnectionRetryInterval = 5000;
+   
    // wired components
 
    private DestinationJNDIMapper destinationJNDIMapper;
@@ -301,7 +305,7 @@
          {
 	         clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
 	         		                                                  clusterPullConnectionFactoryName, defaultPreserveOrdering,
-	         		                                                  SecurityStore.SUCKER_USER, suckerPassword);
+	         		                                                  SecurityStore.SUCKER_USER, suckerPassword, suckerConnectionRetryTimes, suckerConnectionRetryInterval);
 	         clusterNotifier.registerListener(clusterConnectionManager);
          }
 
@@ -1711,7 +1715,27 @@
       return true;
    }
 
+   public void setSuckerConnectionRetryTimes(int suckerConnectionRetryTimes)
+   {
+      this.suckerConnectionRetryTimes = suckerConnectionRetryTimes;
+   }
 
+   public int getSuckerConnectionRetryTimes()
+   {
+      return suckerConnectionRetryTimes;
+   }
+
+   public void setSuckerConnectionRetryInterval(int suckerConnectionRetryInterval)
+   {
+      this.suckerConnectionRetryInterval = suckerConnectionRetryInterval;
+   }
+
+   public int getSuckerConnectionRetryInterval()
+   {
+      return suckerConnectionRetryInterval;
+   }
+
+
    // Inner classes --------------------------------------------------------------------------------
 
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
@@ -62,7 +63,7 @@
    
    private static final long CLOSE_TIMEOUT = 2000;
 		
-   private boolean trace = log.isTraceEnabled();	
+   private static boolean trace = log.isTraceEnabled();	
    
 	private Map connections;
 
@@ -81,11 +82,17 @@
 	private String suckerUser;
 	
 	private String suckerPassword;
+   
+   private int maxRetry;
+   
+   private int retryInterval; //5 sec
 	
 	public ClusterConnectionManager(int nodeID,
 			                          String connectionFactoryUniqueName, boolean preserveOrdering,
 			                          String suckerUser,
-			                          String suckerPassword)
+			                          String suckerPassword,
+			                          int maxRetry,
+			                          int retryInterval)
 	{
 		connections = new HashMap();
 		
@@ -99,6 +106,10 @@
 		
 		this.suckerPassword = suckerPassword;
 		
+		this.maxRetry = maxRetry;
+		
+		this.retryInterval = retryInterval;
+		
 		if (trace) { log.trace("Created " + this); }
 	}
 	
@@ -347,7 +358,7 @@
 				}
 			}
 			else if (notification.type == ClusterNotification.TYPE_UNBIND)
-			{		
+			{		      
 				String queueName = (String)notification.data;
 				
 				if (notification.nodeID == this.nodeID)
@@ -355,7 +366,6 @@
 					//Local unbind
 					
 					//We need to remove any suckers corresponding to remote nodes
-					
 					removeAllSuckers(queueName);
 				}
 				else
@@ -363,7 +373,6 @@
 					//Remote unbind
 					
 					//We need to remove the sucker corresponding to the remote queue
-					
 					removeSucker(queueName, notification.nodeID);					
 				}
 			}
@@ -400,7 +409,8 @@
 			{
 				try
 				{
-   				ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword);
+   				ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, 
+   				                                         suckerPassword, nid == this.nodeID, maxRetry, retryInterval);
    				
    				log.trace(this + " created connection info " + info);
    				
@@ -607,23 +617,30 @@
 		}
 	}
 				
-	class ConnectionInfo
-	{
-		private JBossConnectionFactory connectionFactory;
+	public static class ConnectionInfo implements ExceptionListener
+	{	   
+		protected JBossConnectionFactory connectionFactory;
 		
-		private JBossConnection connection;
+		protected JBossConnection connection;
 		
-		private Session session;
+		protected Session session;
 		
-		private Map suckers;
+		protected Map suckers;
 		
-		private boolean started;
+		protected boolean started;
 		
 		private String suckerUser;
 		
 		private String suckerPassword;
 		
-		ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword) throws Exception
+		protected boolean isLocal;
+		
+		private int maxRetry;
+		
+		private int retryInterval;
+		
+		public ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, 
+		               String suckerPassword, boolean isLocal, int maxRetry, int retryInterval) throws Exception
 		{
 			this.connectionFactory = connectionFactory;
 			
@@ -632,9 +649,15 @@
 			this.suckerUser = suckerUser;
 			
 			this.suckerPassword = suckerPassword;
+			
+			this.isLocal = isLocal;
+			
+			this.maxRetry = maxRetry;
+			
+			this.retryInterval = retryInterval;
 		}
 		
-		synchronized void start() throws Exception
+		protected synchronized void start() throws Exception
 		{			
 			if (started)			
 			{
@@ -645,11 +668,17 @@
 		   {
 				connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
 				
+				//local connection doesn't need listener.
+				if (!isLocal)
+				{
+				  connection.setExceptionListener(this);
+				}
+				
 				session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 		   }
 			
-			connection.start();
-			
+         connection.start();
+
 			started = true;
 		}
 		
@@ -691,7 +720,7 @@
 			suckers.clear();
 		}
 		
-		synchronized void close()
+		protected synchronized void close()
 		{
 			closeAllSuckers();			
 			
@@ -726,12 +755,12 @@
 			started = false;
 		}
 		
-		synchronized boolean hasSucker(String queueName)
+		protected synchronized boolean hasSucker(String queueName)
 		{
 			return suckers.containsKey(queueName);
 		}
 		
-		synchronized void addSucker(MessageSucker sucker)
+		protected synchronized void addSucker(MessageSucker sucker)
 		{
 			if (suckers.containsKey(sucker.getQueueName()))
 			{
@@ -746,6 +775,115 @@
 			MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
 			
 			return sucker;
-		}		
-	}	
+		}
+
+		//https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+		//on exception, try to recreate all suckers.
+      public void onException(JMSException e)
+      {
+         log.warn("Connection failure detected. Clean up and retry connection. maxRetry: " + maxRetry + " retryInterval: " + retryInterval);
+         cleanupConnection();
+         retryConnection();
+      }
+      
+      //first stop all the suckers
+      //then try to close the connection
+      protected synchronized void cleanupConnection()
+      {
+         if (!started)
+         {
+            return;
+         }
+         
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageSucker sucker = (MessageSucker)iter.next();
+            
+            sucker.suspend();
+         }
+
+         Callable callable = new Callable() { public Object call()
+         {
+            try
+            {
+               connection.close();
+            }
+            catch (JMSException ignore)
+            {              
+            }
+            return null;
+          } };
+         
+         Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+         
+         try
+         {
+            timedCallable.call();
+         }
+         catch (Throwable t)
+         {
+            //Ignore - the server might have already closed - so this is ok
+         }
+         
+         connection = null;
+         
+         started = false;
+      }
+      
+      protected synchronized int retryConnection()
+      {         
+         int retryCount = 0;
+         
+         while (((maxRetry == -1) || (retryCount < maxRetry)) && (suckers.size() > 0))
+         {
+            try
+            {
+               start();
+               break;
+            }
+            catch (Exception e)
+            {
+               retryCount++;
+               if (trace) 
+               { 
+                  log.trace("Retrying ConnectionInfo " + this + " failed, retry count: " + retryCount, e); 
+               }
+               try
+               {
+                  this.wait(retryInterval);
+               }
+               catch(InterruptedException ite)
+               {
+               }
+            }
+         }
+         
+         if (!started)
+         {
+            log.error("Retrying ConnectionInfo " + this + " failed after maxmum retry: " + retryCount);
+            return retryCount;
+         }
+         
+         //now resume the suckers
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageSucker sucker = (MessageSucker)iter.next();
+            
+            try
+            {
+               sucker.resume(session);
+            }
+            catch (JMSException e)
+            {
+               log.warn("Error resuming sucker " + sucker, e);
+            }
+         }
+         
+         return retryCount;
+      }
+	}
 }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.impl.clusterconnection;
 
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -77,14 +78,16 @@
 	
 	private long sourceChannelID;
 	
-	private JBossQueue jbq;
+	protected JBossQueue jbq;
 	
+	private boolean suspended = false;
+	
 	public String toString()
 	{
 		return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
 	}
 				
-	MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
+	protected MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
 	              boolean preserveOrdering, long sourceChannelID)
    {	
       if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
@@ -102,7 +105,7 @@
       this.sourceChannelID = sourceChannelID;
    }
 	
-	synchronized void start() throws Exception
+	protected synchronized void start() throws Exception
 	{
 		if (started)
 		{
@@ -140,56 +143,142 @@
 		if (trace) { log.trace(this + " Registered sucker"); }
 	}
 	
-	synchronized void stop()
-	{
-		if (!started)
-		{
-			return;
-		}
-		
-		setConsuming(false);
-				
-		localQueue.unregisterSucker(this);
-		
-		try
-		{
-			consumer.closing(-1);
-		}
-		catch (Throwable t)
-		{
-			// Ignore
-		}
-		try
-		{
-			consumer.close();
-		}
-		catch (Throwable t)
-		{
-			//Ignore
-		}
-		
-		try
-		{
-			producer.close();
-		}
-		catch (Throwable t)
-		{
-			//Ignore
-		}
+   protected void stop()
+   {
+      localQueue.unregisterSucker(this);
 
-		sourceSession = null;
-		
-		localSession = null;
-		
-		consumer = null;
-		
-		clientConsumer = null;
-		
-		producer = null;
-		
-		started = false;
-	}
-	
+      synchronized (this)
+      {
+         if (!started)
+         {
+            return;
+         }
+
+         setConsuming(false);
+
+         try
+         {
+            consumer.closing(-1);
+         }
+         catch (Throwable t)
+         {
+            // Ignore
+         }
+         try
+         {
+            consumer.close();
+         }
+         catch (Throwable t)
+         {
+            // Ignore
+         }
+
+         try
+         {
+            producer.close();
+         }
+         catch (Throwable t)
+         {
+            // Ignore
+         }
+
+         sourceSession = null;
+
+         localSession = null;
+
+         consumer = null;
+
+         clientConsumer = null;
+
+         producer = null;
+
+         started = false;
+      }
+   }
+ 
+   //the suspend stops the sucker's receiving end but doesn't unregister the sucker.
+   //we only suspend the consumer side.
+   public synchronized void suspend()
+   {
+      if (!started || suspended)
+      {
+         return;
+      }
+
+      boolean oldConsuming = consuming;
+      
+      setConsuming(false);
+      
+      consuming = oldConsuming;
+      
+      suspended = true;
+
+      try
+      {
+         consumer.closing(-1);
+      }
+      catch (Throwable t)
+      {
+         // Ignore
+      }
+      try
+      {
+         consumer.close();
+      }
+      catch (Throwable t)
+      {
+         // Ignore
+      }
+
+      sourceSession = null;
+
+      consumer = null;
+
+      clientConsumer = null;
+   }
+   
+   
+   public synchronized void resume(Session srcSession) throws JMSException
+   {
+      if (!suspended)
+      {
+         return;
+      }
+
+      sourceSession = srcSession;
+      
+      SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+      
+      consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+      
+      clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+
+      try
+      {
+         if (consuming)
+         {
+            if (trace) { log.trace(this + " resuming client consumer"); }
+            
+            clientConsumer.resume();
+         }
+         else
+         {
+            if (trace) { log.trace(this + " pausing client consumer"); }
+            
+            clientConsumer.pause();
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         //We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
+      }
+      
+      consumer.setMessageListener(this);
+
+      suspended = false;
+   }
+
 	public String getQueueName()
 	{
 		return this.localQueue.getName();
@@ -199,6 +288,14 @@
 	{
 		if (trace) { log.trace(this + " setConsuming " + consume); }
 		
+		//for supended, we set the consuming flag and do nothing.
+		//later on resume, we force the sucker to be the last set consuming state.
+		if (suspended)
+		{
+		   consuming = consume;
+		   return;
+		}
+		
 		try
 		{
 			if (consume && !consuming)

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,625 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Delivery;
+import org.jboss.messaging.core.contract.DeliveryObserver;
+import org.jboss.messaging.core.contract.Distributor;
+import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
+import org.jboss.messaging.core.impl.clusterconnection.ClusterConnectionManager.ConnectionInfo;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A FakeClusterConnectionManager
+ * 
+ * Used to test Message Sucker. We use this one to get rid of the links 
+ * between ClusterConnectionManager and the ServerPeer. We only need to 
+ * test the 'Client' aspect of message sucker, to see if it can correctly
+ * 'suck' messages. We don't care about the sending aspect of the suckers.
+ * That makes the test easier.
+ *
+ * @author howard
+ * 
+ * Created Sep 14, 2009 1:38:09 PM
+ *
+ */
+public class FakeClusterConnectionManager
+{
+
+   public static final long CLOSE_TIMEOUT = 2000;
+
+   private Map connections;
+
+   private boolean started;
+   
+   private int remoteID;
+   
+   private int thisID;
+   
+   private JBossConnectionFactory remoteFactory;
+   
+   private String suckerUser;
+   
+   private String suckerPassword;
+   
+   private int maxRetry;
+   
+   private int retryInterval;
+   
+   public FakeClusterConnectionManager(int remoteID,
+                                   JBossConnectionFactory theFactory,
+                                   String suckerUser,
+                                   String suckerPassword,
+                                   int maxRetry,
+                                   int retryInterval,
+                                   int thisID)
+   {
+      connections = new HashMap();
+      
+      this.remoteID = remoteID;
+      
+      this.remoteFactory = theFactory;
+      
+      this.suckerUser = suckerUser;
+      
+      this.suckerPassword = suckerPassword;
+      
+      this.maxRetry = maxRetry;
+      
+      this.retryInterval = retryInterval;
+      
+      this.thisID = thisID;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      started = true;
+   }
+   
+   public void createConnectionInfo(boolean updateJMSObject) throws Exception
+   {
+      FakeConnectionInfo info = new FakeConnectionInfo(remoteFactory, suckerUser, suckerPassword, remoteID == thisID, maxRetry, retryInterval, updateJMSObject);
+      connections.put(remoteID, info);
+      info.start();
+   }
+   
+   public int getRetryTimes(int node)
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      return info.getRetryTimes();    
+   }
+
+   public String waitForReconnectionOK(int node)
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      return info.waitForReconnectionOK();      
+   }
+   
+   public void resetFactory(int node, JBossConnectionFactory fact)
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      info.resetFactory(fact);      
+   }
+
+   public void updateQueueInSucker(int node, Queue queue) throws JMSException
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      info.updateQueueInSucker(queue);      
+   }
+
+   public String checkMessageSucked(int node, TextMessage[] messages) throws JMSException
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      return info.checkMessageSucked(messages);
+   }
+
+   public String checkMessageNotSucked(int node) throws JMSException
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      return info.checkMessageNotSucked();
+   }
+   
+   public String checkConnectionFailureDetected(int node)
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      return info.checkConnectionFailureDetected();      
+   }
+
+   public void createSucker(Queue queue, int nid, boolean beginSuck) throws Exception
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(nid));
+
+      if (!info.hasSucker(queue.getQueueName()))
+      {
+            FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer);
+
+            info.addSucker(sucker);
+            
+            sucker.start();
+            
+            sucker.setConsuming(beginSuck);
+      }
+   }
+
+   class FakeCoreQueue implements org.jboss.messaging.core.contract.Queue
+   {
+
+      private String queueName;
+
+      public FakeCoreQueue(Queue queue) throws JMSException
+      {
+         queueName = queue.getQueueName();
+      }
+
+      public String getName()
+      {
+         return queueName;
+      }
+
+      public void addAllToRecoveryArea(int nodeID, Map ids)
+      {
+      }
+
+      public void addToRecoveryArea(int nodeID, long messageID, String sessionID)
+      {
+      }
+
+      public int getDownCacheSize()
+      {
+         return 0;
+      }
+
+      public Filter getFilter()
+      {
+         return null;
+      }
+
+      public int getFullSize()
+      {
+         return 0;
+      }
+
+      public Distributor getLocalDistributor()
+      {
+         return null;
+      }
+
+      public int getNodeID()
+      {
+         return 0;
+      }
+
+      public int getPageSize()
+      {
+         return 0;
+      }
+
+      public long getRecoverDeliveriesTimeout()
+      {
+         return 0;
+      }
+
+      public Map getRecoveryArea()
+      {
+         return null;
+      }
+
+      public int getRecoveryMapSize()
+      {
+         return 0;
+      }
+
+      public Distributor getRemoteDistributor()
+      {
+         return null;
+      }
+
+      public Delivery handleMove(MessageReference ref, long sourceChannelID)
+      {
+         return null;
+      }
+
+      public boolean isClustered()
+      {
+         return false;
+      }
+
+      public void mergeIn(long channelID, int nodeID) throws Exception
+      {
+      }
+
+      public List recoverDeliveries(List messageIds)
+      {
+         return null;
+      }
+
+      public void registerSucker(MessageSucker sucker)
+      {
+      }
+
+      public void removeAllFromRecoveryArea(int nodeID)
+      {
+      }
+
+      public void removeFromRecoveryArea(int nodeID, long messageID)
+      {
+      }
+
+      public void removeStrandedReferences(String sessionID)
+      {
+      }
+
+      public void setPagingParams(int fullSize, int pageSize, int downCacheSize)
+      {
+      }
+
+      public boolean unregisterSucker(MessageSucker sucker)
+      {
+         return false;
+      }
+
+      public void activate()
+      {
+      }
+
+      public List browse(Filter filter)
+      {
+         return null;
+      }
+
+      public void close()
+      {
+      }
+
+      public void deactivate()
+      {
+      }
+
+      public void deliver()
+      {
+      }
+
+      public long getChannelID()
+      {
+         return 0;
+      }
+
+      public int getDeliveringCount()
+      {
+         return 0;
+      }
+
+      public int getMaxSize()
+      {
+         return 0;
+      }
+
+      public int getMessageCount()
+      {
+         return 0;
+      }
+
+      public int getMessagesAdded()
+      {
+         return 0;
+      }
+
+      public int getScheduledCount()
+      {
+         return 0;
+      }
+
+      public boolean isActive()
+      {
+         return false;
+      }
+
+      public boolean isRecoverable()
+      {
+         return false;
+      }
+
+      public void load() throws Exception
+      {
+      }
+
+      public void removeAllReferences() throws Throwable
+      {
+      }
+
+      public void setMaxSize(int newSize)
+      {
+      }
+
+      public void unload() throws Exception
+      {
+      }
+
+      public void acknowledge(Delivery d, Transaction tx) throws Throwable
+      {
+      }
+
+      public void acknowledgeNoPersist(Delivery d) throws Throwable
+      {
+      }
+
+      public void cancel(Delivery d) throws Throwable
+      {
+      }
+
+      public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+      {
+         return null;
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+   
+   //Note: This currently is suitable for test of only one sucker!
+   class FakeConnectionInfo extends ConnectionInfo
+   {
+      private ArrayList<TextMessage> suckBuffer = new ArrayList<TextMessage>();
+      
+      private Object connFailureLock = new Object();
+      
+      private boolean connFailed = false;
+      
+      private Object reconnLock = new Object();
+      
+      private boolean reconnOK = false;
+      
+      private boolean updateJMSObjects = true;
+      
+      private int lastRetryTimes;
+
+      FakeConnectionInfo(JBossConnectionFactory factory, String suckerUser, String suckerPassword, 
+                         boolean isLocal, int maxRetry, int retryInterval, boolean updateJMS) throws Exception
+      {
+         super(factory, suckerUser, suckerPassword, isLocal, maxRetry, retryInterval);
+         updateJMSObjects = updateJMS;
+      }
+      
+      public int getRetryTimes()
+      {
+         return lastRetryTimes;
+      }
+
+      public Session getSession()
+      {
+         return super.session;
+      }
+
+      public void updateQueueInSucker(Queue queue) throws JMSException
+      {
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            FakeMessageSucker sucker = (FakeMessageSucker)iter.next();
+            sucker.updateQueue(new JBossQueue(queue.getQueueName()));
+         }
+
+      }
+
+      public String waitForReconnectionOK()
+      {
+         synchronized(reconnLock)
+         {
+            if (!reconnOK)
+            {
+               try
+               {
+                  reconnLock.wait(20000);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+            if (reconnOK)
+            {
+               reconnOK = false;
+               return null;
+            }
+            return "Reconnection failed after 10 seconds";
+         }
+      }
+
+      public String checkConnectionFailureDetected()
+      {
+         synchronized (connFailureLock)
+         {
+            if (connFailed)
+            {
+               connFailed = false;
+               return null;
+            }
+            try
+            {
+               connFailureLock.wait(10000);
+            }
+            catch (InterruptedException e)
+            {
+            }
+            if (connFailed)
+            {
+               return null;
+            }
+            else
+            {
+               return "Connection Failure not detected with in 10 sec";
+            }
+         }
+      }
+
+      public String checkMessageSucked(TextMessage[] messages) throws JMSException
+      {
+         String result = null;
+         if (messages.length != suckBuffer.size())
+         {
+            result = "Number of sucked messages not right, expected: " + messages.length + " but was: " + suckBuffer.size();
+            return result;
+         }
+         for (int i = 0; i < messages.length; i++)
+         {
+            TextMessage msg = suckBuffer.get(i);
+            if (!messages[i].getText().equals(msg.getText()))
+            {
+               result = "Message sucked not right, expected: " + messages[i].getText() + " but was: " + msg.getText();
+               break;
+            }
+         }
+         suckBuffer.clear();
+         return result;
+      }
+
+      public String checkMessageNotSucked() throws JMSException
+      {
+         String result = null;
+         if (suckBuffer.size() > 0)
+         {
+            result = "Number of sucked messages not right, expected: 0 but was: " + suckBuffer.size();
+            return result;
+         }
+         suckBuffer.clear();
+         return result;
+      }
+
+      //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+      //on exception, try to recreate all suckers.
+      public void onException(JMSException e)
+      {
+         synchronized(connFailureLock)
+         {
+            this.connFailed = true;
+            connFailureLock.notify();
+         }
+         super.onException(e);
+      }
+      
+      public synchronized void resetFactory(JBossConnectionFactory newFact)
+      {
+         connectionFactory = newFact;
+         this.notify();
+      }
+      
+      protected synchronized void cleanupConnection()
+      {
+         if (updateJMSObjects)
+         {
+            connectionFactory = null;
+         }
+         super.cleanupConnection();
+      }
+      
+      protected synchronized int retryConnection()
+      {         
+         //regain factory: this is not the true behavior of reconnection.
+         //in real case, the factory should be expected to be accessible. If not, that means the node is crashed or shutdown
+         //in which case reconnection no longer apply. New connection info will be created instead.
+         while (connectionFactory == null)
+         {
+            try
+            {
+               this.wait();
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+         
+         lastRetryTimes = super.retryConnection();
+         
+         synchronized(reconnLock)
+         {
+            reconnOK = super.started;
+            reconnLock.notify();
+         }
+         
+         return lastRetryTimes;
+      }
+      
+      public void close()
+      {
+         super.close();
+      }
+      
+      public void start() throws Exception
+      {
+         super.start();
+      }
+      
+      public boolean hasSucker(String queueName)
+      {
+         return super.hasSucker(queueName);
+      }
+      
+      public void addSucker(MessageSucker sucker)
+      {
+         super.addSucker(sucker);
+      }
+   }
+
+   public void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      Iterator iter = connections.values().iterator();
+      
+      while (iter.hasNext())
+      {
+         FakeConnectionInfo info = (FakeConnectionInfo)iter.next();
+         
+         info.close();
+      }
+      
+      connections.clear();
+      
+      started = false;
+   }
+
+
+}

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
+
+/**
+ * A FakeMessageSucker
+ *
+ * @author howard
+ * 
+ * Created Sep 14, 2009 2:31:27 PM
+ *
+ *
+ */
+public class FakeMessageSucker extends MessageSucker
+{
+   private List<TextMessage> buffer;
+
+   private Object queueUpdateLock = new Object();
+
+   private boolean queueNotUpdated = true;
+
+   FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer)
+   {
+      super(localQueue, sourceSession, sourceSession, true, sourceChannelID);
+      this.buffer = buffer;
+   }
+
+   // Public --------------------------------------------------------
+   public void start() throws Exception
+   {
+      super.start();
+   }
+
+   public void stop()
+   {
+      super.stop();
+   }
+
+   public void updateQueue(JBossQueue q)
+   {
+      synchronized (queueUpdateLock)
+      {
+         this.jbq = q;
+         queueNotUpdated = false;
+         queueUpdateLock.notify();
+      }
+   }
+
+   public void resume(Session session) throws JMSException
+   {
+      synchronized (queueUpdateLock)
+      {
+         if (queueNotUpdated)
+         {
+            try
+            {
+               queueUpdateLock.wait(20000);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+      }
+      super.resume(session);
+   }
+
+   public void onMessage(Message msg)
+   {
+
+      try
+      {
+         buffer.add((TextMessage)msg);
+         msg.acknowledge();
+      }
+      catch (JMSException e)
+      {
+      }
+   }
+
+   public void resetQueue(JBossQueue newQueue)
+   {
+      jbq = newQueue;
+   }
+
+}

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,305 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.security.SecurityMetadataStore;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A MessageSuckerTest
+ *
+ * @author howard
+ * 
+ * Created Sep 14, 2009 12:23:30 PM
+ *
+ *
+ */
+public class MessageSuckerTest extends ClusteringTestBase
+{
+
+   public MessageSuckerTest(String name)
+   {
+      super(name);
+   }
+
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+   // Initiate a Fake ClusterConnectionManager to connection to a node.
+   // send messages to the node and check if messages are sucked.
+   // then kill the node and restart the node, check if the messages still
+   // can be sucked.
+   public void testMessageSuckerReconnection() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+         JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+         FakeClusterConnectionManager clusterConnMgr = new FakeClusterConnectionManager(0,
+                                                                                        factory,
+                                                                                        SecurityStore.SUCKER_USER,
+                                                                                        SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD,
+                                                                                        -1,
+                                                                                        2000,
+                                                                                        5);
+         clusterConnMgr.start();
+
+         clusterConnMgr.createConnectionInfo(true);
+
+         clusterConnMgr.createSucker(queue[0], 0, true);
+
+         conn1 = createConnectionOnServer(factory, 0);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer1 = sess1.createProducer(queue[0]);
+
+         TextMessage[] messages = new TextMessage[1];
+
+         for (int i = 0; i < messages.length; i++)
+         {
+            messages[i] = sess1.createTextMessage("suck-msg" + i);
+            producer1.send(messages[i]);
+         }
+
+         try
+         {
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e)
+         {
+            // ignore.
+         }
+
+         String result = clusterConnMgr.checkMessageSucked(0, messages);
+         assertNull(result, result);
+
+         // Now kill Node 0
+         ServerManagement.stop(0);
+
+         // Sucker connection should receive notification
+         clusterConnMgr.checkConnectionFailureDetected(0);
+         assertNull(result, result);
+         
+         //sleep for 10 sec
+         try
+         {
+            Thread.sleep(4000);
+         }
+         catch(InterruptedException e)
+         {
+            //ignore
+         }
+
+         // Now startup Node 0 again, here we clean up the DB as the
+         // message last sent won't be removed because it is sucked, not really received.
+         ServerManagement.start(0, "all", true);
+         ServerManagement.deployQueue("testDistributedQueue", 0);
+
+         queue[0] = (Queue)ic[0].lookup("queue/testDistributedQueue");
+
+         factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+         // to simulate the real case, we need to restore the connection factory and the queue
+         // in reality, they don't need to update as the node aren't really dead, so those
+         // objects are supposed to be valid.
+         clusterConnMgr.resetFactory(0, factory);
+         clusterConnMgr.updateQueueInSucker(0, queue[0]);
+
+         result = clusterConnMgr.waitForReconnectionOK(0);
+         assertNull(result, result);
+
+         // now send 1 more messages
+         conn2 = createConnectionOnServer(factory, 0);
+
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer2 = sess2.createProducer(queue[0]);
+
+         for (int i = 0; i < messages.length; i++)
+         {
+            messages[i] = sess2.createTextMessage("new-suck-msg" + i);
+            producer2.send(messages[i]);
+         }
+
+         Thread.sleep(2000);
+
+         // should be sucked.
+         result = clusterConnMgr.checkMessageSucked(0, messages);
+         assertNull(result, result);
+
+         clusterConnMgr.stop();
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+   // Initiate a Fake ClusterConnectionManager to connection to a node.
+   // send messages to the node and check if messages are sucked. Set retry times to 2 and retryInterval 1000.
+   // then kill the node and restart the node, check if the messages cannot be sucked 
+   public void testMessageSuckerReconnection2() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+         JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+         FakeClusterConnectionManager clusterConnMgr = new FakeClusterConnectionManager(0,
+                                                                                        factory,
+                                                                                        SecurityStore.SUCKER_USER,
+                                                                                        SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD,
+                                                                                        2,
+                                                                                        1000,
+                                                                                        5);
+         clusterConnMgr.start();
+
+         clusterConnMgr.createConnectionInfo(false);
+
+         clusterConnMgr.createSucker(queue[0], 0, true);
+
+         conn1 = createConnectionOnServer(factory, 0);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer1 = sess1.createProducer(queue[0]);
+
+         TextMessage[] messages = new TextMessage[1];
+
+         for (int i = 0; i < messages.length; i++)
+         {
+            messages[i] = sess1.createTextMessage("suck-msg" + i);
+            producer1.send(messages[i]);
+         }
+
+         try
+         {
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e)
+         {
+            // ignore.
+         }
+
+         String result = clusterConnMgr.checkMessageSucked(0, messages);
+         assertNull(result, result);
+
+         // Now kill Node 0
+         ServerManagement.stop(0);
+
+         // Sucker connection should receive notification
+         clusterConnMgr.checkConnectionFailureDetected(0);
+         assertNull(result, result);
+         
+         //sleep for 4 sec to let the retry fail
+         try
+         {
+            Thread.sleep(4000);
+         }
+         catch(InterruptedException e)
+         {
+            //ignore
+         }
+
+         // Now startup Node 0 again, here we clean up the DB as the
+         // message last sent won't be removed because it is sucked, not really received.
+         ServerManagement.start(0, "all", true);
+         ServerManagement.deployQueue("testDistributedQueue", 0);
+
+         queue[0] = (Queue)ic[0].lookup("queue/testDistributedQueue");
+
+         factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+         result = clusterConnMgr.waitForReconnectionOK(0);
+         assertNotNull(result, result);
+         
+         int n = clusterConnMgr.getRetryTimes(0);
+         assertEquals(2, n);
+
+         // now send 1 more messages
+         conn2 = createConnectionOnServer(factory, 0);
+
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer2 = sess2.createProducer(queue[0]);
+
+         for (int i = 0; i < messages.length; i++)
+         {
+            messages[i] = sess2.createTextMessage("new-suck-msg" + i);
+            producer2.send(messages[i]);
+         }
+
+         Thread.sleep(2000);
+
+         // should be sucked.
+         result = clusterConnMgr.checkMessageNotSucked(0);
+         assertNull(result, result);
+
+         removeAllMessages(queue[0].getQueueName(), true, 0);
+
+         clusterConnMgr.stop();
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+}

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -24,6 +24,7 @@
 import javax.management.ObjectName;
 import javax.management.RuntimeMBeanException;
 
+import org.jboss.jms.server.ServerPeer;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.container.LocalTestServer;
@@ -101,6 +102,27 @@
       }
    }
    
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+   //make sure the defaults are correct
+   public void testServerPeerAttributeDefaults() throws Exception
+   {
+      LocalTestServer server = new LocalTestServer();
+      
+      try
+      {
+         server.start("all", null, false, true);
+         ServerPeer sp = server.getServerPeer();
+         int interval = sp.getSuckerConnectionRetryInterval();
+         assertEquals(5000, interval);
+         int maxRetry = sp.getSuckerConnectionRetryTimes();
+         assertEquals(-1, maxRetry);
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java	2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java	2009-10-14 16:15:53 UTC (rev 7853)
@@ -30,7 +30,6 @@
    // Constants -----------------------------------------------------
 
    public static final boolean DEFAULT_CLUSTERED_MODE = false;
-
    // Static --------------------------------------------------------
 
    public static String getHypersonicDatabase(String connectionURL)




More information about the jboss-cvs-commits mailing list