[jboss-cvs] JBoss Messaging SVN: r6422 - in branches/Branch_Temp_Clebert_LargeMessage: src/main/org/jboss/messaging/core/client/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Apr 14 12:28:02 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-14 12:28:02 -0400 (Tue, 14 Apr 2009)
New Revision: 6422
Modified:
branches/Branch_Temp_Clebert_LargeMessage/.classpath
branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
Changes before merge (double checks on Memory-leaks)
Modified: branches/Branch_Temp_Clebert_LargeMessage/.classpath
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/.classpath 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/.classpath 2009-04-14 16:28:02 UTC (rev 6422)
@@ -43,7 +43,6 @@
<classpathentry kind="src" path="examples/javaee/mdb/src"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
<classpathentry kind="lib" path="thirdparty/sun-javacc/lib/javacc.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-xerces/lib/xercesImpl.jar"/>
@@ -74,5 +73,6 @@
<classpathentry kind="lib" path="thirdparty/jboss/jboss-reflect/lib/jboss-reflect.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jboss-mdr/lib/jboss-mdr.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/jboss-profiler"/>
<classpathentry kind="output" path="eclipse-output"/>
</classpath>
Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-14 16:28:02 UTC (rev 6422)
@@ -110,10 +110,26 @@
private final Object failoverLock = new Object();
- // TODO - allow this to be configurable
- private static final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(5,
- new org.jboss.messaging.utils.JBMThreadFactory("jbm-pinger-threads"));
+ private static ScheduledThreadPoolExecutor pingExecutor;
+ static
+ {
+ recreatePingExecutor();
+ }
+
+ public static void recreatePingExecutor()
+ {
+ if (pingExecutor != null)
+ {
+ pingExecutor.shutdown();
+ }
+
+ // TODO - allow this to be configurable
+ pingExecutor = new ScheduledThreadPoolExecutor(5,
+ new org.jboss.messaging.utils.JBMThreadFactory("jbm-pinger-threads"));
+
+ }
+
private final Map<Object, ConnectionEntry> connections = Collections.synchronizedMap(new LinkedHashMap<Object, ConnectionEntry>());
private int refCount;
@@ -165,7 +181,7 @@
final long retryInterval,
final double retryIntervalMultiplier,
final int reconnectAttempts)
- {
+ {
this.connectorConfig = connectorConfig;
this.backupConfig = backupConfig;
@@ -259,17 +275,17 @@
synchronized (failoverLock)
{
connection = getConnectionWithRetry(1, reconnectAttempts);
-
+
if (connection == null)
- {
+ {
if (!failureSignalled)
{
// This can happen if the connection manager gets closed - e.g. the server gets shut down
-
+
throw new MessagingException(MessagingException.NOT_CONNECTED, "Unable to connect to server");
}
else
- {
+ {
// This means an async failure came in while getConnectionForCreateSession was executing, we
// need
// to allow the failover/reconnection to occur and let the create session retry after
@@ -465,7 +481,7 @@
{
return false;
}
-
+
if (connectionID != null && !connections.containsKey(connectionID))
{
// We already failed over/reconnected - probably the first failure came in, all the connections were failed
@@ -508,9 +524,9 @@
// It should then return its connections, with channel 1 lock still held
// It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
// until failover is complete
-
+
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
-
+
boolean done = false;
if (attemptFailover || reconnectAttempts != 0)
@@ -571,7 +587,7 @@
if (attemptFailover)
{
// Now try failing over to backup
-
+
connectorFactory = backupConnectorFactory;
transportParams = backupTransportParams;
@@ -579,11 +595,11 @@
backupConnectorFactory = null;
backupTransportParams = null;
-
+
done = reattachSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1);
}
else if (reconnectAttempts != 0)
- {
+ {
done = reattachSessions(reconnectAttempts);
}
@@ -709,7 +725,7 @@
long interval = retryInterval;
int count = 0;
-
+
while (true)
{
if (closed || failureSignalled)
@@ -718,7 +734,7 @@
}
RemotingConnection connection = getConnection(initialRefCount);
-
+
if (connection == null)
{
// Failed to get connection
@@ -733,7 +749,7 @@
return null;
}
-
+
try
{
Thread.sleep(interval);
@@ -765,12 +781,12 @@
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
- connections.clear();
+ connections.clear();
for (ConnectionEntry entry : copy)
{
try
- {
+ {
entry.connection.destroy();
}
catch (Throwable ignore)
@@ -908,7 +924,7 @@
{
refCount--;
}
-
+
if (entry != null)
{
checkCloseConnections();
@@ -981,19 +997,19 @@
channel1.returnBlocking();
}
}
-
+
private void failConnection(final Object connectionID, final MessagingException me)
{
ConnectionEntry entry = connections.get(connectionID);
-
+
if (entry != null)
{
RemotingConnection conn = entry.connection;
-
+
conn.fail(me);
- }
+ }
}
-
+
private static class ConnectionEntry
{
ConnectionEntry(final RemotingConnection connection, final Connector connector)
Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-04-14 16:28:02 UTC (rev 6422)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.remoting.impl.invm;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -57,8 +58,16 @@
private final int serverID;
- private static final ExecutorFactory factory =
- new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads")));
+
+ private static ExecutorService serviceFactory = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads"));
+ private static ExecutorFactory factory = new OrderedExecutorFactory(serviceFactory);
+
+ public static void recreateFactory()
+ {
+ serviceFactory.shutdown();
+ serviceFactory = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads"));
+ factory = new OrderedExecutorFactory(serviceFactory);
+ }
private final Executor executor;
Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java 2009-04-14 16:28:02 UTC (rev 6422)
@@ -32,6 +32,8 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.core.server.Messaging;
@@ -79,10 +81,19 @@
return sf;
}
+
+ protected int getNumIterations()
+ {
+ return 50;
+ }
+
protected void start() throws Exception
{
+ InVMRegistry.instance.clear();
+ InVMConnection.recreateFactory();
+
startNullPersistence();
//startJournal();
}
@@ -152,6 +163,7 @@
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
backupParams));
backupConf.setBackup(true);
+ backupConf.setJMXManagementEnabled(false);
backupServer = Messaging.newMessagingServer(backupConf, false);
backupServer.start();
@@ -166,6 +178,7 @@
connectors.put(backupTC.getName(), backupTC);
liveConf.setConnectorConfigurations(connectors);
liveConf.setBackupConnectorName(backupTC.getName());
+ liveConf.setJMXManagementEnabled(false);
liveServer = Messaging.newMessagingServer(liveConf, false);
liveServer.start();
}
Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-04-14 16:28:02 UTC (rev 6422)
@@ -23,8 +23,10 @@
package org.jboss.messaging.tests.integration.cluster.failover;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -34,9 +36,12 @@
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.impl.QueueControl;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.profiler.jvmti.JVMTIInterface;
/**
* A MultiThreadFailoverSupport
@@ -100,6 +105,8 @@
final boolean failOnCreateConnection,
final long failDelay) throws Exception
{
+ JVMTIInterface jvmti = new JVMTIInterface();
+
for (int its = 0; its < numIts; its++)
{
log.info("Beginning iteration " + its);
@@ -184,10 +191,62 @@
assertEquals(0, sf.numConnections());
stop();
+
+
+
+ {
+ jvmti.forceGC();
+
+ Object[] instances = jvmti.getAllObjects(SessionReceiveContinuationMessage.class);
+
+ System.out.println("************* Containing " + instances.length + " of SessionReceiveContinuationMessage");
+
+ if (instances.length > 10)
+ {
+ instances = null;
+ System.out.println(threadDump("Leak detection"));
+
+ printReferences(jvmti, SessionReceiveContinuationMessage.class);
+
+ }
+
+
+
+ }
+
}
+
+
}
+ /**
+ * @param jvmti
+ * @param instances
+ * @throws Exception
+ * @throws IOException
+ */
+ private void printReferences(JVMTIInterface jvmti, Class<?> clazz) throws Exception, IOException
+ {
+ Object instances[] = jvmti.getAllObjects(clazz);
+
+ if (instances.length > 0)
+ {
+ Object obj = instances[0];
+ instances = null;
+
+ System.out.println("Inventory:\n" + jvmti.inventoryReport());
+
+ Map map = jvmti.createIndexMatrix();
+
+ System.out.println("References of " + clazz.getCanonicalName() + ": \n" +
+ jvmti.exploreObjectReferences(map, obj, 10, false));
+
+
+ jvmti.releaseTags();
+ }
+ }
+
// Private -------------------------------------------------------
private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
Modified: branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-04-14 16:09:22 UTC (rev 6421)
+++ branches/Branch_Temp_Clebert_LargeMessage/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-04-14 16:28:02 UTC (rev 6422)
@@ -37,14 +37,18 @@
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnection;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
import org.jboss.messaging.utils.SimpleString;
+import org.jboss.profiler.jvmti.JVMTIInterface;
/**
* A MultiThreadRandomFailoverTestBase
@@ -84,6 +88,8 @@
public void testA() throws Exception
{
+
+
runTestMultipleThreads(new RunnableT()
{
@Override
@@ -1287,7 +1293,7 @@
protected int getNumIterations()
{
- return 500;
+ return 50;
}
@Override
@@ -1357,6 +1363,10 @@
liveServer.stop();
assertEquals(0, InVMRegistry.instance.size());
+
+ InVMRegistry.instance.clear();
+ InVMConnection.recreateFactory();
+ ConnectionManagerImpl.recreatePingExecutor();
}
private void sendMessages(final ClientSession sessSend,
More information about the jboss-cvs-commits
mailing list