[jboss-cvs] JBoss Messaging SVN: r6743 - in tags/JBossMessaging_1_4_0_SP3_CP03_1456: src/main/org/jboss/jms/client/container and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 12 11:24:30 EDT 2009


Author: gaohoward
Date: 2009-05-12 11:24:30 -0400 (Tue, 12 May 2009)
New Revision: 6743

Modified:
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/etc/remoting/remoting-bisocket-service.xml
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
   tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
check in Ron's change and fix OOM issue


Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-12 15:24:30 UTC (rev 6743)
@@ -38,7 +38,14 @@
                <attribute name="stopLeaseOnFailure" isParam="true">true</attribute>
                
                <!-- Periodicity of client pings. Server window by default is twice this figure -->                               
-               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               <attribute name="validatorPingPeriod" isParam="true">10000</attribute>
+               <attribute name="validatorPintTimeout" isParam="true">5000</attribute>
+
+               <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+               <attribute name="callbackErrorsAllowed">1</attribute>
+               <attribute name="registerCallbackListener">false</attribute>
+               <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
 	       	       
 	       <attribute name="timeout" isParam="true">0</attribute>
 

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -95,11 +95,11 @@
          // install the consolidated remoting connection listener; it will be de-installed on
          // connection closing by ConnectionAspect
 
-         ConsolidatedRemotingConnectionListener listener =
-            new ConsolidatedRemotingConnectionListener();
+//         ConsolidatedRemotingConnectionListener listener =
+//            new ConsolidatedRemotingConnectionListener();
+//
+//         remotingConnection.addConnectionListener(listener);
 
-         remotingConnection.addConnectionListener(listener);
-
          if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
@@ -109,8 +109,10 @@
             new ConnectionState(serverID, connectionDelegate,
                                 remotingConnection, versionToUse);
 
-         listener.setConnectionState(connectionState);
-          
+//         listener.setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().start();
+       
          connectionDelegate.setState(connectionState);
       }
 

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -30,6 +30,7 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.debug.JMSObjectTracker;
 import org.jboss.jms.debug.TrackerFactory;
@@ -151,7 +152,7 @@
       
       try
       {         
-         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener());
          
          remotingConnection.start();
    

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -42,6 +42,8 @@
    private ExceptionListener jmsExceptionListener;
 
    private ConnectionFailureListener remotingListener;
+   
+   private boolean started;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -53,6 +55,11 @@
 
    public void handleConnectionException(Throwable throwable, Client client)
    {
+      if (!started)
+      {
+         return;
+      }
+      
       // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
       // to avoid race conditions
 
@@ -162,6 +169,11 @@
       }
       return state + ".ConsolidatedListener";
    }
+   
+   public void start()
+   {
+      started = true;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -22,6 +22,7 @@
 package org.jboss.jms.client.remoting;
 
 import java.security.AccessController;
+import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
@@ -258,9 +259,15 @@
 
    public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
    {
+       this(serverLocatorURI, clientPing, strictTck, null);
+   }
+   
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener) throws Exception
+   {
       serverLocator = new InvokerLocator(serverLocatorURI);
       this.clientPing = clientPing;
       this.strictTck = strictTck;
+      this.remotingConnectionListener = listener;
 
       log.trace(this + " created");
    }
@@ -318,7 +325,14 @@
       {
          public Object run() throws Exception
          {
-            client.connect();
+            if (remotingConnectionListener != null)
+            {
+                client.connect(remotingConnectionListener, serverLocator.getParameters());
+            }
+            else
+            {
+                client.connect();
+            }
             onewayClient.connect();
             return null;
          }
@@ -435,7 +449,13 @@
 	  tracker.report("call setFail on : " + this, null, false);
 	  
       failed = true;
-
+      
+      if (client == null) 
+      {
+    	  tracker.report("client already null " + this, null, false);
+    	  return;
+      }
+      
       // Remoting has the bad habit of letting the job of cleaning after a failed connection up to
       // the application. Here, we take care of that, by disconnecting the remoting client, and
       // thus silencing both the connection validator and the lease pinger, and also locally
@@ -487,7 +507,7 @@
       return true;
    }
 
-   public synchronized void addPlainConnectionListener(ConnectionListener listener)
+   public synchronized void addPlainConnectionListener(final ConnectionListener listener)
    {
       client.addConnectionListener(listener, serverLocator.getParameters());
    }

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -427,7 +427,7 @@
 
             try
             {
-               ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+               ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
             }
             catch (Throwable ignore)
             {
@@ -536,7 +536,7 @@
                try
                {
             	   tracker.report("try destroy cb for " + jmsSessionID, null, false);
-                  ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+                  ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
                }
                catch (Throwable ignore)
                {

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -35,9 +35,7 @@
  *
  */
 public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
-	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-	  	  
+{	  	  
 	private final String name;
 	
 	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +47,6 @@
 		this.name = name;
 		
 		setThreadFactory(new Factory());
-		
-		clearThread();
-		
-		restart();
 	}
 	
 	private class Factory implements ThreadFactory

Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2009-05-12 14:44:10 UTC (rev 6742)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2009-05-12 15:24:30 UTC (rev 6743)
@@ -6,6 +6,8 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Set;
@@ -23,11 +25,17 @@
 
 import org.jboss.jms.client.FailoverEvent;
 import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossSession;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.messaging.util.JBMExecutor;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
 
+import sun.management.ManagementFactory;
+
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @version <tt>$Revision$</tt>
@@ -2050,6 +2058,75 @@
       }
    }
 
+   public void testThreadLeakOnSessionFailover() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = createConnectionOnServer(cf, 1);
+         conn.start();
+
+         JBossSession session = (JBossSession)conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue[1]);
+
+         MessageConsumer cons = session.createConsumer(queue[1]);
+         
+         Message m = session.createTextMessage("clik");
+         prod.send(m);
+
+         TextMessage tm = (TextMessage)cons.receive(2000);
+
+         assertNotNull(tm);
+         assertEquals("clik", tm.getText());
+
+         checkJBMSessionThread(1);
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         ServerManagement.kill(1);
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         m = session.createTextMessage("clik");
+         prod.send(m);
+
+         tm = (TextMessage)cons.receive(2000);
+
+         assertNotNull(tm);
+         assertEquals("clik", tm.getText());
+         
+         checkJBMSessionThread(1);
+         
+      }
+      finally
+      {
+         if (conn != null)
+         {
+        	 System.err.println("---------------------------------------closeing connection-----------------------");
+            conn.close();
+         }
+         checkJBMSessionThread(0);
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -2189,6 +2266,24 @@
       }
    }
 
+   private void checkJBMSessionThread(int num) throws Exception
+   {
+		ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+		long[] ids = bean.getAllThreadIds();
+		ThreadInfo[] infos = bean.getThreadInfo(ids);
+		
+		int thrCount = 0;
+		for (int i = 0; i < infos.length; i++)
+		{
+			String name = infos[i].getThreadName();
+			log.info("Thread: " + name);
+			if (name.contains("jbm-client-session-"))
+			{
+				thrCount++;
+			}
+		}
+		assertEquals(num, thrCount);
+   }
 
 
    // Inner classes --------------------------------------------------------------------------------




More information about the jboss-cvs-commits mailing list