[jboss-cvs] JBoss Messaging SVN: r7799 - in branches: Branch_JBMESSAGING_1732 and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 10 03:52:34 EDT 2009


Author: gaohoward
Date: 2009-09-10 03:52:34 -0400 (Thu, 10 Sep 2009)
New Revision: 7799

Added:
   branches/Branch_JBMESSAGING_1732/
Modified:
   branches/Branch_JBMESSAGING_1732/.classpath
   branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
   branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_JBMESSAGING_1732/tests/build.properties
   branches/Branch_JBMESSAGING_1732/tests/etc/container.xml
   branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
   branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
Log:
a temporary branch


Copied: branches/Branch_JBMESSAGING_1732 (from rev 7798, branches/Branch_1_4)

Modified: branches/Branch_JBMESSAGING_1732/.classpath
===================================================================
--- branches/Branch_1_4/.classpath	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/.classpath	2009-09-10 07:52:34 UTC (rev 7799)
@@ -22,7 +22,7 @@
 	<classpathentry kind="src" path="docs/examples/topic/src"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
-	<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/extra/work-clebert/concurrent/concurrent/src"/>
+	<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/home/howard/projects/jboss/messaging/1.4/e_cp09/thirdparty/oswego-concurrent/lib/concurrent-src.zip"/>
 	<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
@@ -46,7 +46,7 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar" sourcepath="/home/howard/projects/jboss/messaging/1.4/e_cp09/thirdparty/jboss/common/lib/jboss-common-sources.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>

Modified: branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-service.xml	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml	2009-09-10 07:52:34 UTC (rev 7799)
@@ -94,6 +94,10 @@
       
       <attribute name="EnableMessageCounters">false</attribute>
       
+      <attribute name="SuckerConnectionRetryTimes">-1</attribute>
+      
+      <attribute name="SuckerConnectionRetryInterval">5000</attribute>
+      
       <!-- The password used by the message sucker connections to create connections.
            THIS SHOULD ALWAYS BE CHANGED AT INSTALL TIME TO SECURE SYSTEM
       <attribute name="SuckerPassword"></attribute>

Modified: branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2009-09-10 07:52:34 UTC (rev 7799)
@@ -223,6 +223,18 @@
       <name>SuckerPassword</name>
       <type>java.lang.String</type>
    </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+      <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+      <name>SuckerConnectionRetryTimes</name>
+      <type>int</type>
+   </attribute>   
+   
+   <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+      <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+      <name>SuckerConnectionRetryInterval</name>
+      <type>int</type>
+   </attribute>   
 
 
    <!-- Managed operations -->

Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java	2009-09-10 07:52:34 UTC (rev 7799)
@@ -169,6 +169,10 @@
    // For generating unique Channel ID for cluster without a shared DB
    private long serverStartTime;
 
+   private int suckerConnectionRetryTimes = -1;
+   
+   private int suckerConnectionRetryInterval = 5000;
+   
    // wired components
 
    private DestinationJNDIMapper destinationJNDIMapper;
@@ -301,7 +305,7 @@
          {
 	         clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
 	         		                                                  clusterPullConnectionFactoryName, defaultPreserveOrdering,
-	         		                                                  SecurityStore.SUCKER_USER, suckerPassword);
+	         		                                                  SecurityStore.SUCKER_USER, suckerPassword, suckerConnectionRetryTimes, suckerConnectionRetryInterval);
 	         clusterNotifier.registerListener(clusterConnectionManager);
          }
 
@@ -1714,7 +1718,27 @@
       return true;
    }
 
+   public void setSuckerConnectionRetryTimes(int suckerConnectionRetryTimes)
+   {
+      this.suckerConnectionRetryTimes = suckerConnectionRetryTimes;
+   }
 
+   public int getSuckerConnectionRetryTimes()
+   {
+      return suckerConnectionRetryTimes;
+   }
+
+   public void setSuckerConnectionRetryInterval(int suckerConnectionRetryInterval)
+   {
+      this.suckerConnectionRetryInterval = suckerConnectionRetryInterval;
+   }
+
+   public int getSuckerConnectionRetryInterval()
+   {
+      return suckerConnectionRetryInterval;
+   }
+
+
    // Inner classes --------------------------------------------------------------------------------
 
 

Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2009-09-10 07:52:34 UTC (rev 7799)
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
@@ -81,11 +82,17 @@
 	private String suckerUser;
 	
 	private String suckerPassword;
+   
+   private int maxRetry; //retry infinitely
+   
+   private int retryInterval; //5 sec
 	
 	public ClusterConnectionManager(int nodeID,
 			                          String connectionFactoryUniqueName, boolean preserveOrdering,
 			                          String suckerUser,
-			                          String suckerPassword)
+			                          String suckerPassword,
+			                          int maxRetry,
+			                          int retryInterval)
 	{
 		connections = new HashMap();
 		
@@ -99,6 +106,10 @@
 		
 		this.suckerPassword = suckerPassword;
 		
+		this.maxRetry = maxRetry;
+		
+		this.retryInterval = retryInterval;
+		
 		if (trace) { log.trace("Created " + this); }
 	}
 	
@@ -400,7 +411,7 @@
 			{
 				try
 				{
-   				ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword);
+   				ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword, nid == this.nodeID);
    				
    				log.trace(this + " created connection info " + info);
    				
@@ -607,7 +618,7 @@
 		}
 	}
 				
-	class ConnectionInfo
+	class ConnectionInfo implements ExceptionListener
 	{
 		private JBossConnectionFactory connectionFactory;
 		
@@ -623,7 +634,9 @@
 		
 		private String suckerPassword;
 		
-		ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword) throws Exception
+		private boolean isLocal;
+		
+		ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword, boolean isLocal) throws Exception
 		{
 			this.connectionFactory = connectionFactory;
 			
@@ -632,6 +645,8 @@
 			this.suckerUser = suckerUser;
 			
 			this.suckerPassword = suckerPassword;
+			
+			this.isLocal = isLocal;
 		}
 		
 		synchronized void start() throws Exception
@@ -645,6 +660,12 @@
 		   {
 				connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
 				
+				//local connection doesn't need listener.
+				if (!isLocal)
+				{
+				  connection.setExceptionListener(this);
+				}
+				
 				session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 		   }
 			
@@ -693,7 +714,7 @@
 		
 		synchronized void close()
 		{
-			closeAllSuckers();			
+			closeAllSuckers();
 			
 			//Note we use a timed callable since remoting has a habit of hanging on attempting to close
 			//We do not want this to hang the system - especially failover
@@ -746,6 +767,112 @@
 			MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
 			
 			return sucker;
-		}		
-	}	
+		}
+
+		//https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+		//on exception, try to recreate all suckers.
+      public void onException(JMSException e)
+      {
+         cleanupConnection();
+         retryConnection();
+      }
+      
+      //first stop all the suckers
+      //then try to close the connection
+      private synchronized void cleanupConnection()
+      {
+         if (!started)
+         {
+            return;
+         }
+         
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageSucker sucker = (MessageSucker)iter.next();
+            
+            sucker.suspend();
+         }
+
+         Callable callable = new Callable() { public Object call()
+         {
+            try
+            {
+               connection.close();
+            }
+            catch (JMSException ignore)
+            {              
+            }
+            return null;
+          } };
+         
+         Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+         
+         try
+         {
+            timedCallable.call();
+         }
+         catch (Throwable t)
+         {
+            //Ignore - the server might have already closed - so this is ok
+         }
+         
+         connection = null;
+         
+         started = false;
+      }
+      
+      private synchronized void retryConnection()
+      {         
+         int retryCount = 0;
+         
+         while ((maxRetry == -1) || (retryCount < maxRetry))
+         {
+            try
+            {
+               start();
+               break;
+            }
+            catch (Exception e)
+            {
+               retryCount++;
+               if (trace) 
+               { 
+                  log.trace("Retrying ConnectionInfo " + this + " failed, retry count: " + retryCount, e); 
+               }
+               try
+               {
+                  Thread.sleep(retryInterval);
+               }
+               catch(InterruptedException ite)
+               {
+               }
+            }
+         }
+         
+         if (!started)
+         {
+            log.error("Retrying ConnectionInfo " + this + " failed after maxmum retry: " + retryCount);
+            return;
+         }
+         
+         //now resume the suckers
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageSucker sucker = (MessageSucker)iter.next();
+            
+            try
+            {
+               sucker.resume(session);
+            }
+            catch (JMSException e)
+            {
+               log.warn("Error resuming sucker " + sucker, e);
+            }
+         }
+      }
+	}
 }

Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2009-09-10 07:52:34 UTC (rev 7799)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.impl.clusterconnection;
 
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -79,6 +80,8 @@
 	
 	private JBossQueue jbq;
 	
+	private boolean suspended = false;
+	
 	public String toString()
 	{
 		return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
@@ -191,12 +194,76 @@
          started = false;
       }
    }
-	
+   
+   //the suspend stops the sucker's receiving end but doesn't unregister the sucker.
+   //we only suspend the consumer side.
+   public synchronized void suspend()
+   {
+      if (!started || suspended)
+      {
+         return;
+      }
+
+      setConsuming(false);
+      
+      suspended = true;
+
+      try
+      {
+         consumer.closing(-1);
+      }
+      catch (Throwable t)
+      {
+         // Ignore
+      }
+      try
+      {
+         consumer.close();
+      }
+      catch (Throwable t)
+      {
+         // Ignore
+      }
+
+      sourceSession = null;
+
+      consumer = null;
+
+      clientConsumer = null;
+   }
+   
+   
+   public synchronized void resume(Session srcSession) throws JMSException
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      sourceSession = srcSession;
+      
+      SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+      
+      consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+      
+      clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+                        
+      consumer.setMessageListener(this);     
+
+      suspended = false;
+
+      boolean forcedState = consuming;
+      
+      consuming = !consuming;
+      
+      setConsuming(forcedState);
+   }
+
 	public String getQueueName()
 	{
 		return this.localQueue.getName();
 	}
-	
+
 	public synchronized void setConsuming(boolean consume)
 	{
 		if (trace) { log.trace(this + " setConsuming " + consume); }
@@ -206,6 +273,14 @@
          return;
       }
 		
+		//for supended, we set the consuming flag and do nothing.
+		//later on resume, we force the sucker to be the last set consuming state.
+		if (suspended)
+		{
+		   consuming = consume;
+		   return;
+		}
+		
 		try
 		{
 			if (consume && !consuming)

Modified: branches/Branch_JBMESSAGING_1732/tests/build.properties
===================================================================
--- branches/Branch_1_4/tests/build.properties	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/build.properties	2009-09-10 07:52:34 UTC (rev 7799)
@@ -2,5 +2,7 @@
 # Local overrides for the builds initiated by build.sh
 #
 
-test.bind.address=192.168.1.101
+test.bind.address=localhost
+jgroups.bind_addr=localhost
+
 default.database=mysql

Modified: branches/Branch_JBMESSAGING_1732/tests/etc/container.xml
===================================================================
--- branches/Branch_1_4/tests/etc/container.xml	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/etc/container.xml	2009-09-10 07:52:34 UTC (rev 7799)
@@ -35,7 +35,7 @@
          <url>jdbc:mysql://localhost/messaging</url>
          <driver>com.mysql.jdbc.Driver</driver>
          <isolation>TRANSACTION_READ_COMMITTED</isolation>
-         <username>root</username>
+         <username>sa</username>
       </database-configuration>
 
 

Modified: branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java	2009-09-10 07:52:34 UTC (rev 7799)
@@ -24,6 +24,7 @@
 import javax.management.ObjectName;
 import javax.management.RuntimeMBeanException;
 
+import org.jboss.jms.server.ServerPeer;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.container.LocalTestServer;
@@ -143,6 +144,27 @@
          server.stop();
       }
    }
+
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+   //make sure the defaults are correct
+   public void testServerPeerAttributeDefaults() throws Exception
+   {
+      LocalTestServer server = new LocalTestServer();
+      
+      try
+      {
+         server.start("all", null, false, true);
+         ServerPeer sp = server.getServerPeer();
+         int interval = sp.getSuckerConnectionRetryInterval();
+         assertEquals(5000, interval);
+         int maxRetry = sp.getSuckerConnectionRetryTimes();
+         assertEquals(-1, maxRetry);
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
    
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java	2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java	2009-09-10 07:52:34 UTC (rev 7799)
@@ -13,6 +13,7 @@
 import java.util.Map;
 import java.util.StringTokenizer;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.util.XMLUtil;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
@@ -30,7 +31,7 @@
    // Constants -----------------------------------------------------
 
    public static final boolean DEFAULT_CLUSTERED_MODE = false;
-
+   private static final DebugLogger log = new DebugLogger("hudson.log");
    // Static --------------------------------------------------------
 
    public static String getHypersonicDatabase(String connectionURL)
@@ -219,9 +220,12 @@
    private void setCurrentDatabase(String xmlConfigDatabase)
    {
       database = System.getProperty("test.database");
+      
+      log.log("xxx123--- database got from system property: " + database);
       if (database == null)
       {
          database = xmlConfigDatabase;
+         log.log("xxx123--- database set to: " + database);
       }
    }
 




More information about the jboss-cvs-commits mailing list