[jboss-cvs] JBoss Messaging SVN: r6422 - in branches/Branch_Temp_Clebert_LargeMessage: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 14 12:28:02 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-14 12:28:02 -0400 (Tue, 14 Apr 2009)
New Revision: 6422

Modified:
   branches/Branch_Temp_Clebert_LargeMessage/.classpath
   branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
   branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
   branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
Changes before merge (double checks on Memory-leaks)

Modified: branches/Branch_Temp_Clebert_LargeMessage/.classpath
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/.classpath	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/.classpath	2009-04-14 16:28:02 UTC (rev 6422)
@@ -43,7 +43,6 @@
 	<classpathentry kind="src" path="examples/javaee/mdb/src"/>
 	<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
 	<classpathentry kind="lib" path="thirdparty/sun-javacc/lib/javacc.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-xerces/lib/xercesImpl.jar"/>
@@ -74,5 +73,6 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/jboss-reflect/lib/jboss-reflect.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/jboss-mdr/lib/jboss-mdr.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/jboss-profiler"/>
 	<classpathentry kind="output" path="eclipse-output"/>
 </classpath>

Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-14 16:28:02 UTC (rev 6422)
@@ -110,10 +110,26 @@
 
    private final Object failoverLock = new Object();
 
-   // TODO - allow this to be configurable
-   private static final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(5,
-                                                                                                   new org.jboss.messaging.utils.JBMThreadFactory("jbm-pinger-threads"));
+   private static ScheduledThreadPoolExecutor pingExecutor;
 
+   static
+   {
+      recreatePingExecutor();
+   }
+
+   public static void recreatePingExecutor()
+   {
+      if (pingExecutor != null)
+      {
+         pingExecutor.shutdown();
+      }
+
+      // TODO - allow this to be configurable
+      pingExecutor = new ScheduledThreadPoolExecutor(5,
+                                                     new org.jboss.messaging.utils.JBMThreadFactory("jbm-pinger-threads"));
+
+   }
+
    private final Map<Object, ConnectionEntry> connections = Collections.synchronizedMap(new LinkedHashMap<Object, ConnectionEntry>());
 
    private int refCount;
@@ -165,7 +181,7 @@
                                 final long retryInterval,
                                 final double retryIntervalMultiplier,
                                 final int reconnectAttempts)
-   {      
+   {
       this.connectorConfig = connectorConfig;
 
       this.backupConfig = backupConfig;
@@ -259,17 +275,17 @@
                synchronized (failoverLock)
                {
                   connection = getConnectionWithRetry(1, reconnectAttempts);
-                  
+
                   if (connection == null)
-                  {                     
+                  {
                      if (!failureSignalled)
                      {
                         // This can happen if the connection manager gets closed - e.g. the server gets shut down
-                        
+
                         throw new MessagingException(MessagingException.NOT_CONNECTED, "Unable to connect to server");
                      }
                      else
-                     {                        
+                     {
                         // This means an async failure came in while getConnectionForCreateSession was executing, we
                         // need
                         // to allow the failover/reconnection to occur and let the create session retry after
@@ -465,7 +481,7 @@
       {
          return false;
       }
-      
+
       if (connectionID != null && !connections.containsKey(connectionID))
       {
          // We already failed over/reconnected - probably the first failure came in, all the connections were failed
@@ -508,9 +524,9 @@
          // It should then return its connections, with channel 1 lock still held
          // It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
          // until failover is complete
-         
+
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
-         
+
          boolean done = false;
 
          if (attemptFailover || reconnectAttempts != 0)
@@ -571,7 +587,7 @@
             if (attemptFailover)
             {
                // Now try failing over to backup
-               
+
                connectorFactory = backupConnectorFactory;
 
                transportParams = backupTransportParams;
@@ -579,11 +595,11 @@
                backupConnectorFactory = null;
 
                backupTransportParams = null;
-               
+
                done = reattachSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1);
             }
             else if (reconnectAttempts != 0)
-            {              
+            {
                done = reattachSessions(reconnectAttempts);
             }
 
@@ -709,7 +725,7 @@
       long interval = retryInterval;
 
       int count = 0;
-      
+
       while (true)
       {
          if (closed || failureSignalled)
@@ -718,7 +734,7 @@
          }
 
          RemotingConnection connection = getConnection(initialRefCount);
-         
+
          if (connection == null)
          {
             // Failed to get connection
@@ -733,7 +749,7 @@
 
                   return null;
                }
-               
+
                try
                {
                   Thread.sleep(interval);
@@ -765,12 +781,12 @@
 
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
 
-         connections.clear();                 
+         connections.clear();
 
          for (ConnectionEntry entry : copy)
          {
             try
-            {               
+            {
                entry.connection.destroy();
             }
             catch (Throwable ignore)
@@ -908,7 +924,7 @@
       {
          refCount--;
       }
-      
+
       if (entry != null)
       {
          checkCloseConnections();
@@ -981,19 +997,19 @@
          channel1.returnBlocking();
       }
    }
-   
+
    private void failConnection(final Object connectionID, final MessagingException me)
    {
       ConnectionEntry entry = connections.get(connectionID);
-      
+
       if (entry != null)
       {
          RemotingConnection conn = entry.connection;
-         
+
          conn.fail(me);
-      }     
+      }
    }
-   
+
    private static class ConnectionEntry
    {
       ConnectionEntry(final RemotingConnection connection, final Connector connector)

Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-04-14 16:28:02 UTC (rev 6422)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.remoting.impl.invm;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -57,8 +58,16 @@
    
    private final int serverID;
 
-   private static final ExecutorFactory factory =
-      new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads")));
+   
+   private static ExecutorService serviceFactory = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads")); 
+   private static ExecutorFactory factory = new OrderedExecutorFactory(serviceFactory);
+   
+   public static void recreateFactory()
+   {
+      serviceFactory.shutdown();
+      serviceFactory = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads"));
+      factory = new OrderedExecutorFactory(serviceFactory);
+   }
 
    private final Executor executor;
 

Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	2009-04-14 16:28:02 UTC (rev 6422)
@@ -32,6 +32,8 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.JournalType;
 import org.jboss.messaging.core.server.Messaging;
@@ -79,10 +81,19 @@
       return sf;
 
    }
+   
+   protected int getNumIterations()
+   {
+      return 50;
+   }
 
+
    
    protected void start() throws Exception
    {
+      InVMRegistry.instance.clear();
+      InVMConnection.recreateFactory();
+
       startNullPersistence();
       //startJournal();
    }
@@ -152,6 +163,7 @@
                 .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
                                                 backupParams));
       backupConf.setBackup(true);
+      backupConf.setJMXManagementEnabled(false);
       backupServer = Messaging.newMessagingServer(backupConf, false);
       backupServer.start();
 
@@ -166,6 +178,7 @@
       connectors.put(backupTC.getName(), backupTC);
       liveConf.setConnectorConfigurations(connectors);
       liveConf.setBackupConnectorName(backupTC.getName());
+      liveConf.setJMXManagementEnabled(false);
       liveServer = Messaging.newMessagingServer(liveConf, false);
       liveServer.start();
    }

Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-04-14 16:28:02 UTC (rev 6422)
@@ -23,8 +23,10 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -34,9 +36,12 @@
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.impl.QueueControl;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.profiler.jvmti.JVMTIInterface;
 
 /**
  * A MultiThreadFailoverSupport
@@ -100,6 +105,8 @@
                                        final boolean failOnCreateConnection,
                                        final long failDelay) throws Exception
    {
+      JVMTIInterface jvmti = new JVMTIInterface();
+
       for (int its = 0; its < numIts; its++)
       {
          log.info("Beginning iteration " + its);
@@ -184,10 +191,62 @@
          assertEquals(0, sf.numConnections());
 
          stop();
+         
+         
+         
+         {
+            jvmti.forceGC();
+            
+            Object[] instances = jvmti.getAllObjects(SessionReceiveContinuationMessage.class);
+            
+            System.out.println("************* Containing " + instances.length + " of SessionReceiveContinuationMessage");
+            
+            if (instances.length > 10)
+            {
+               instances = null;
+               System.out.println(threadDump("Leak detection"));
+               
+               printReferences(jvmti, SessionReceiveContinuationMessage.class);
+               
+            }
+            
+            
+            
+         }
+         
       }
+      
+      
    }
 
+   /**
+    * @param jvmti
+    * @param instances
+    * @throws Exception
+    * @throws IOException
+    */
+   private void printReferences(JVMTIInterface jvmti, Class<?> clazz) throws Exception, IOException
+   {
+      Object instances[] = jvmti.getAllObjects(clazz);
+      
+      if (instances.length > 0)
+      {
+         Object obj = instances[0];
+         instances = null;
+         
+         System.out.println("Inventory:\n" + jvmti.inventoryReport());
+         
+         Map map = jvmti.createIndexMatrix();
+         
+         System.out.println("References of " + clazz.getCanonicalName() + ": \n" + 
+                            jvmti.exploreObjectReferences(map, obj, 10, false));
+         
+         
+         jvmti.releaseTags();
+      }
+   }
 
+
    // Private -------------------------------------------------------
 
    private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)

Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-04-14 16:28:02 UTC (rev 6422)
@@ -37,14 +37,18 @@
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnection;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
 import org.jboss.messaging.utils.SimpleString;
+import org.jboss.profiler.jvmti.JVMTIInterface;
 
 /**
  * A MultiThreadRandomFailoverTestBase
@@ -84,6 +88,8 @@
 
    public void testA() throws Exception
    {
+      
+      
       runTestMultipleThreads(new RunnableT()
       {
          @Override
@@ -1287,7 +1293,7 @@
 
    protected int getNumIterations()
    {
-      return 500;
+      return 50;
    }
 
    @Override
@@ -1357,6 +1363,10 @@
       liveServer.stop();
 
       assertEquals(0, InVMRegistry.instance.size());
+      
+      InVMRegistry.instance.clear();
+      InVMConnection.recreateFactory();
+      ConnectionManagerImpl.recreatePingExecutor();
    }
 
    private void sendMessages(final ClientSession sessSend,




More information about the jboss-cvs-commits mailing list