[jboss-cvs] JBoss Messaging SVN: r6641 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 30 13:22:52 EDT 2009
Author: jmesnil
Date: 2009-04-30 13:22:52 -0400 (Thu, 30 Apr 2009)
New Revision: 6641
Modified:
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
JBMESSAGING-1421: Server resources are not cleaned up when the client crashes/exits without closing properly JBM resources
* if the server resources have not been properly closed before the connection is closed, we clean them up when the connection TTL is hit
* the only exception is when the connection is closed through the management API. In that case, the clean up is immediate
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -430,6 +430,7 @@
String remoteAddress = connection.getRemoteAddress();
if (remoteAddress.contains(ipAddress))
{
+ remotingService.removeConnection(connection.getID());
connection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "connections for " + ipAddress +
" closed by management"));
closed = true;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -449,6 +449,8 @@
// Then call the listeners
callFailureListeners(me);
+ callClosingListeners();
+
internalClose();
for (ChannelImpl channel : channels.values())
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -38,13 +38,22 @@
{
RemotingConnection getConnection(Object remotingConnectionID);
+ /**
+ * Remove a connection from the connections held by the remoting service.
+ * <strong>This method must be used only from the management API.
+ * RemotingConnections are removed from the remoting service when their connectionTTL is hit.</strong>
+ * @param remotingConnectionID the ID of the RemotingConnection to removed
+ * @return the removed RemotingConnection
+ */
+ RemotingConnection removeConnection(Object remotingConnectionID);
+
Set<RemotingConnection> getConnections();
-
+
void addInterceptor(Interceptor interceptor);
-
+
boolean removeInterceptor(Interceptor interceptor);
-
+
void freeze();
-
+
RemotingConnection getServerSideReplicatingConnection();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -242,6 +242,11 @@
return connections.get(remotingConnectionID);
}
+ public RemotingConnection removeConnection(final Object remotingConnectionID)
+ {
+ return connections.remove(remotingConnectionID);
+ }
+
public synchronized Set<RemotingConnection> getConnections()
{
return new HashSet<RemotingConnection>(connections.values());
@@ -281,11 +286,9 @@
public void connectionDestroyed(final Object connectionID)
{
- RemotingConnection conn = connections.remove(connectionID);
- if (conn != null)
- {
- conn.destroy();
- }
+ // We DO NOT destroy the connection when this event is received.
+ // Instead, the connection will be cleaned up when the connection TTL
+ // is hit in FailedConnectionsTask.
}
public void connectionException(final Object connectionID, final MessagingException me)
@@ -346,6 +349,8 @@
for (RemotingConnection conn : failedConnections)
{
+
+ connections.remove(conn.getID());
MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
"Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -28,6 +28,8 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
+import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.CloseListener;
@@ -47,18 +49,13 @@
*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*/
-/**
- * A TemporaryQueueTest
- *
- * @author jmesnil
- *
- *
- */
public class TemporaryQueueTest extends ServiceTestBase
{
// Constants -----------------------------------------------------
+ private static final long CONNECTION_TTL = 2000;
+
// Attributes ----------------------------------------------------
private MessagingServer server;
@@ -134,7 +131,7 @@
});
session.close();
//wait for the closing listeners to be fired
- assertTrue("connection close listeners not fired", latch.await(1, TimeUnit.SECONDS));
+ assertTrue("connection close listeners not fired", latch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS));
session = sf.createSession(false, true, true);
session.start();
@@ -200,11 +197,12 @@
latch.countDown();
}
});
- remotingConnection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "simulate a client failure"));
+
+ ((ClientSessionInternal)session).getConnection().fail(new MessagingException(MessagingException.INTERNAL_ERROR, "simulate a client failure"));
// let some time for the server to clean the connections
- latch.await(1, TimeUnit.SECONDS);
+ latch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
assertEquals(0, server.getConnectionCount());
@@ -233,10 +231,34 @@
{
super.setUp();
- server = createServer(false);
+
+ Configuration configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(false);
+ server = createServer(false, configuration );
server.start();
- sf = createInVMFactory();
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY),
+ null,
+ ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ CONNECTION_TTL,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
+ ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS);
session = sf.createSession(false, true, true);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -126,20 +126,20 @@
System.out.println("VM Exited");
- Thread.sleep(1000);
+ Thread.sleep(2 * CONNECTION_TTL);
assertActiveConnections(1);
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
- // assertActiveSession(1);
+ assertActiveSession(1);
session.close();
- Thread.sleep(1000);
+ Thread.sleep(2 * CONNECTION_TTL);
// the crash must have been detected and the resources cleaned up
assertActiveConnections(0);
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
- // assertActiveSession(0);
+ assertActiveSession(0);
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -22,10 +22,17 @@
package org.jboss.messaging.tests.integration.jms.server.management;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -40,17 +47,6 @@
import org.jboss.messaging.tests.integration.management.ManagementTestBase;
import org.jboss.messaging.tests.unit.util.InVMContext;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* A QueueControlTest
*
@@ -66,6 +62,10 @@
private static final Logger log = Logger.getLogger(JMSServerControl2Test.class);
+ private static final long CONNECTION_TTL = 1000;
+
+ private static final long PING_PERIOD = CONNECTION_TTL / 2;
+
// Attributes ----------------------------------------------------
private InVMContext context;
@@ -166,21 +166,21 @@
assertEquals(0, control.listConnectionIDs().length);
- Connection connection = JMSUtil.createConnection(connectorFactory);
+ Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
String[] connectionIDs = control.listConnectionIDs();
assertEquals(1, connectionIDs.length);
- Connection connection2 = JMSUtil.createConnection(connectorFactory);
+ Connection connection2 = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
assertEquals(2, control.listConnectionIDs().length);
connection.close();
- Thread.sleep(500);
+ Thread.sleep(2 * CONNECTION_TTL);
assertEquals(1, control.listConnectionIDs().length);
connection2.close();
- Thread.sleep(500);
+ Thread.sleep(2 * CONNECTION_TTL);
assertEquals(0, control.listConnectionIDs().length);
}
@@ -204,7 +204,7 @@
assertEquals(0, control.listConnectionIDs().length);
- Connection connection = JMSUtil.createConnection(connectorFactory);
+ Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
String[] connectionIDs = control.listConnectionIDs();
assertEquals(1, connectionIDs.length);
@@ -218,7 +218,7 @@
connection.close();
- Thread.sleep(500);
+ Thread.sleep(2 * CONNECTION_TTL);
assertEquals(0, control.listConnectionIDs().length);
}
@@ -242,7 +242,7 @@
assertEquals(0, control.listRemoteAddresses().length);
- Connection connection = JMSUtil.createConnection(connectorFactory);
+ Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
String[] remoteAddresses = control.listRemoteAddresses();
assertEquals(1, remoteAddresses.length);
@@ -256,8 +256,7 @@
connection.close();
- // FIXME: with Netty, the server is not notified immediately that the connection is closed
- Thread.sleep(1000);
+ Thread.sleep(2 * CONNECTION_TTL);
assertEquals(0, control.listRemoteAddresses().length);
}
@@ -283,7 +282,7 @@
assertEquals(0, server.getConnectionCount());
assertEquals(0, control.listRemoteAddresses().length);
- Connection connection = JMSUtil.createConnection(connectorFactory);
+ Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
assertEquals(1, server.getConnectionCount());
@@ -302,8 +301,12 @@
assertTrue(control.closeConnectionsForAddress(remoteAddress));
- boolean gotException = exceptionLatch.await(5, TimeUnit.SECONDS);
+ boolean gotException = exceptionLatch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
assertTrue("did not received the expected JMSException", gotException);
+ for (String string : control.listRemoteAddresses())
+ {
+ System.out.println(string);
+ }
assertEquals(0, control.listRemoteAddresses().length);
assertEquals(0, server.getConnectionCount());
}
@@ -331,7 +334,7 @@
assertEquals(0, server.getConnectionCount());
assertEquals(0, control.listRemoteAddresses().length);
- Connection connection = JMSUtil.createConnection(connectorFactory);
+ Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
assertEquals(1, server.getConnectionCount());
String[] remoteAddresses = control.listRemoteAddresses();
@@ -348,7 +351,7 @@
assertFalse(control.closeConnectionsForAddress(unknownAddress));
- boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+ boolean gotException = exceptionLatch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
assertFalse(gotException);
assertEquals(1, control.listRemoteAddresses().length);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -79,11 +79,16 @@
public static Connection createConnection(String connectorFactory) throws JMSException
{
+ return createConnection(connectorFactory, DEFAULT_CONNECTION_TTL, DEFAULT_PING_PERIOD);
+ }
+
+ public static Connection createConnection(String connectorFactory, long connectionTTL, long pingPeriod) throws JMSException
+ {
JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration(connectorFactory),
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_PING_PERIOD,
- DEFAULT_CONNECTION_TTL,
+ pingPeriod,
+ connectionTTL,
DEFAULT_CALL_TIMEOUT,
null,
DEFAULT_ACK_BATCH_SIZE,
@@ -106,7 +111,7 @@
return cf.createConnection();
}
-
+
static MessageConsumer createConsumer(Connection connection, Destination destination, String connectorFactory) throws JMSException
{
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-04-30 17:22:52 UTC (rev 6641)
@@ -344,6 +344,8 @@
Thread.sleep(PING_INTERVAL);
}
+
+ Thread.sleep(3 * PING_INTERVAL);
assertTrue(server.getRemotingService().getConnections().isEmpty());
@@ -364,13 +366,24 @@
{
Interceptor noPongInterceptor = new Interceptor()
{
+ boolean allowPing = true;
+
public boolean intercept(Packet packet, RemotingConnection conn) throws MessagingException
{
log.info("In interceptor, packet is " + packet.getType());
if (packet.getType() == PacketImpl.PING)
{
- log.info("Ignoring Ping packet.. it will be dropped");
- return false;
+ if (allowPing)
+ {
+ log.info("allow 1 ping");
+ allowPing = false;
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring Ping packet.. it will be dropped");
+ return false;
+ }
}
else
{
@@ -434,23 +447,14 @@
serverConn.addFailureListener(serverListener);
- for (int i = 0; i < 40; i++)
- {
- // a few tries to avoid a possible race caused by GCs or similar issues
- if (server.getRemotingService().getConnections().isEmpty() && clientListener.getException() != null)
- {
- break;
- }
-
- Thread.sleep(PING_INTERVAL);
- }
-
+ Thread.sleep(3 * PING_INTERVAL);
+
assertNotNull(clientListener.getException());
- // We don't receive an exception on the server in this case
- assertNull(serverListener.getException());
+ // We receive an exception on the server in this case too
+ assertNotNull(serverListener.getException());
- assertTrue(server.getRemotingService().getConnections().isEmpty());
+ assertEquals(0, server.getRemotingService().getConnections().size());
server.getRemotingService().removeInterceptor(noPongInterceptor);
More information about the jboss-cvs-commits
mailing list