[hornetq-commits] JBoss hornetq SVN: r7943 - in trunk: src/main/org/hornetq/core/remoting/server/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Sep 8 12:41:34 EDT 2009
Author: timfox
Date: 2009-09-08 12:41:34 -0400 (Tue, 08 Sep 2009)
New Revision: 7943
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-121
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -149,7 +149,7 @@
if (handler != null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Cannot call receive(...) - a MessageHandler is set");
+ "Cannot call receive(...) - a MessageHandler is set");
}
if (clientWindowSize == 0)
@@ -210,7 +210,7 @@
{
// if we have already pre acked we cant expire
boolean expired = m.isExpired();
-
+
flowControlBeforeConsumption(m);
if (expired)
@@ -279,7 +279,7 @@
if (receiverThread != null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Cannot set MessageHandler - consumer is in receive(...)");
+ "Cannot set MessageHandler - consumer is in receive(...)");
}
boolean noPreviousHandler = handler == null;
@@ -400,7 +400,7 @@
// Flow control for the first packet, we will have others
flowControl(packet.getPacketSize(), false);
-
+
ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
@@ -439,18 +439,18 @@
{
synchronized (this)
{
- //Need to send credits for the messages in the buffer
-
- for (ClientMessageInternal message: this.buffer)
+ // Need to send credits for the messages in the buffer
+
+ for (ClientMessageInternal message : this.buffer)
{
flowControlBeforeConsumption(message);
}
buffer.clear();
}
-
- //Need to send credits for the messages in the buffer
+ // Need to send credits for the messages in the buffer
+
waitForOnMessageToComplete();
}
@@ -742,17 +742,19 @@
flushAcks();
+ clearBuffer();
+
if (sendCloseMessage)
{
channel.sendBlocking(new SessionConsumerCloseMessage(id));
}
-
- clearBuffer();
}
- finally
+ catch (Throwable t)
{
- session.removeConsumer(this);
+ // Consumer close should always return without exception
}
+
+ session.removeConsumer(this);
}
private void clearBuffer()
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -461,7 +461,7 @@
}
public void run()
- {
+ {
while (!closed)
{
long now = System.currentTimeMillis();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -21,8 +21,10 @@
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.server.HornetQServer;
@@ -107,6 +109,61 @@
}
}
}
+
+ //Closing a session if the underlying remoting connection is deaad should cleanly
+ //release all resources
+ public void testCloseSessionOnDestroyedConnection() throws Exception
+ {
+ HornetQServer server = createServer(false);
+ try
+ {
+ //Make sure we have a short connection TTL so sessions will be quickly closed on the server
+ long ttl = 500;
+ server.getConfiguration().setConnectionTTLOverride(ttl);
+ server.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionInternal clientSession = (ClientSessionInternal)cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientProducer producer = clientSession.createProducer();
+ ClientConsumer consumer = clientSession.createConsumer(queueName);
+
+ assertEquals(1, server.getRemotingService().getConnections().size());
+
+ RemotingConnection rc = clientSession.getConnection();
+
+ rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+
+ clientSession.close();
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int cons = server.getRemotingService().getConnections().size();
+
+ if (cons == 0)
+ {
+ break;
+ }
+
+ long now = System.currentTimeMillis();
+
+ if (now - start > 10000)
+ {
+ throw new Exception("Timed out waiting for connections to close");
+ }
+
+ Thread.sleep(50);
+ }
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
public void testBindingQuery() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -16,16 +16,9 @@
import javax.jms.Session;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.integration.jms.server.management.NullInitialContext;
import org.hornetq.tests.util.JMSTestBase;
-import org.hornetq.tests.util.UnitTestCase;
/**
*
Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A CloseDestroyedConnectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseDestroyedConnectionTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(CloseDestroyedConnectionTest.class);
+
+ private HornetQConnectionFactory cf;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ cf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ cf.setBlockOnPersistentSend(true);
+ cf.setPreAcknowledge(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+
+ super.tearDown();
+ }
+
+ /*
+ * Closing a connection that is destroyed should cleanly close everything without throwing exceptions
+ */
+ public void testCloseDestroyedConnection() throws Exception
+ {
+ long connectionTTL = 500;
+ cf.setClientFailureCheckPeriod(connectionTTL / 2);
+ // Need to set connection ttl to a low figure so connections get removed quickly on the server
+ cf.setConnectionTTL(connectionTTL);
+
+ Connection conn = cf.createConnection();
+
+ assertEquals(1, server.getRemotingService().getConnections().size());
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Give time for the initial ping to reach the server before we fail (it has connection TTL in it)
+ Thread.sleep(500);
+
+ String queueName = "myqueue";
+
+ Queue queue = new HornetQQueue(queueName);
+
+ super.createQueue(queueName);
+
+ MessageConsumer consumer = sess.createConsumer(queue);
+
+ MessageProducer producer = sess.createProducer(queue);
+
+ QueueBrowser browser = sess.createBrowser(queue);
+
+ // Now fail the underlying connection
+
+ ClientSessionInternal sessi = (ClientSessionInternal)((HornetQSession)sess).getCoreSession();
+
+ RemotingConnection rc = sessi.getConnection();
+
+ rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+
+ // Now close the connection
+
+ conn.close();
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int cons = server.getRemotingService().getConnections().size();
+
+ if (cons == 0)
+ {
+ break;
+ }
+
+ long now = System.currentTimeMillis();
+
+ if (now - start > 10000)
+ {
+ throw new Exception("Timed out waiting for connections to close");
+ }
+
+ Thread.sleep(50);
+ }
+ }
+}
More information about the hornetq-commits
mailing list