Author: timfox
Date: 2010-09-28 05:33:31 -0400 (Tue, 28 Sep 2010)
New Revision: 9725
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
Log:
https://jira.jboss.org/browse/HORNETQ-525
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-27 12:27:02 UTC
(rev 9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-28 09:33:31 UTC
(rev 9725)
@@ -216,7 +216,7 @@
justCreated = false;
}
- public void start() throws JMSException
+ public synchronized void start() throws JMSException
{
checkClosed();
@@ -229,7 +229,7 @@
started = true;
}
- public void stop() throws JMSException
+ public synchronized void stop() throws JMSException
{
checkClosed();
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-09-27 12:27:02 UTC (rev
9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-09-28 09:33:31 UTC (rev
9725)
@@ -111,11 +111,11 @@
// Constructors --------------------------------------------------
protected HornetQSession(final HornetQConnection connection,
- final boolean transacted,
- final boolean xa,
- final int ackMode,
- final ClientSession session,
- final int sessionType)
+ final boolean transacted,
+ final boolean xa,
+ final int ackMode,
+ final ClientSession session,
+ final int sessionType)
{
this.connection = connection;
@@ -213,7 +213,7 @@
return ackMode;
}
-
+
public boolean isXA()
{
return xa;
@@ -262,21 +262,24 @@
public void close() throws JMSException
{
- try
+ synchronized (connection)
{
- for (HornetQMessageConsumer cons : new
HashSet<HornetQMessageConsumer>(consumers))
+ try
{
- cons.close();
- }
+ for (HornetQMessageConsumer cons : new
HashSet<HornetQMessageConsumer>(consumers))
+ {
+ cons.close();
+ }
- session.close();
+ session.close();
- connection.removeSession(this);
+ connection.removeSession(this);
+ }
+ catch (HornetQException e)
+ {
+ throw JMSExceptionHelper.convertFromHornetQException(e);
+ }
}
- catch (HornetQException e)
- {
- throw JMSExceptionHelper.convertFromHornetQException(e);
- }
}
public void recover() throws JMSException
@@ -393,7 +396,7 @@
try
{
HornetQQueue queue = lookupQueue(queueName, false);
-
+
if (queue == null)
{
queue = lookupQueue(queueName, true);
@@ -413,7 +416,6 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
}
-
public Topic createTopic(final String topicName) throws JMSException
{
@@ -423,7 +425,6 @@
throw new IllegalStateException("Cannot create a topic on a
QueueSession");
}
-
try
{
HornetQTopic topic = lookupTopic(topicName, false);
@@ -477,7 +478,7 @@
}
HornetQDestination jbdest = (HornetQDestination)topic;
-
+
if (jbdest.isQueue())
{
throw new InvalidDestinationException("Cannot create a subscriber on a
queue");
@@ -490,7 +491,7 @@
final String subscriptionName,
String selectorString,
final boolean noLocal) throws
JMSException
- {
+ {
try
{
selectorString = "".equals(selectorString) ? null : selectorString;
@@ -525,7 +526,7 @@
SimpleString autoDeleteQueueName = null;
if (dest.isQueue())
- {
+ {
BindingQuery response = session.bindingQuery(dest.getSimpleAddress());
if (!response.isExists())
@@ -573,7 +574,7 @@
}
queueName = new
SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
-
subscriptionName));
+
subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -678,10 +679,10 @@
}
HornetQDestination jbq = (HornetQDestination)queue;
-
+
if (!jbq.isQueue())
{
- throw new InvalidDestinationException("Cannot create a browser on a
topic");
+ throw new InvalidDestinationException("Cannot create a browser on a
topic");
}
try
@@ -767,7 +768,7 @@
}
SimpleString queueName = new
SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
-
name));
+
name));
try
{
@@ -887,7 +888,7 @@
{
throw new InvalidDestinationException("Not a temporary topic " +
tempTopic);
}
-
+
try
{
BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
@@ -949,7 +950,7 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
}
-
+
public void start() throws JMSException
{
try
@@ -991,11 +992,11 @@
}
catch (HornetQException ignore)
{
- //Exception on deleting queue shouldn't prevent close from completing
+ // Exception on deleting queue shouldn't prevent close from completing
}
}
}
-
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
@@ -1007,11 +1008,11 @@
throw new IllegalStateException("Session is closed");
}
}
-
+
private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws
HornetQException
{
HornetQQueue queue;
-
+
if (isTemporary)
{
queue = HornetQDestination.createTemporaryQueue(queueName);
@@ -1020,7 +1021,7 @@
{
queue = HornetQDestination.createQueue(queueName);
}
-
+
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (response.isExists())
@@ -1032,12 +1033,12 @@
return null;
}
}
-
+
private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary)
throws HornetQException
{
HornetQTopic topic;
-
+
if (isTemporary)
{
topic = HornetQDestination.createTemporaryTopic(topicName);
@@ -1046,7 +1047,7 @@
{
topic = HornetQDestination.createTopic(topicName);
}
-
+
BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
if (!query.isExists())
@@ -1059,7 +1060,6 @@
}
}
-
// Inner classes -------------------------------------------------
}
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java 2010-09-28
09:33:31 UTC (rev 9725)
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A ConcurrentSessionCloseTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ConcurrentSessionCloseTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(ConcurrentSessionCloseTest.class);
+
+ private HornetQConnectionFactory cf;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ cf = HornetQJMSClient.createConnectionFactory(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+
+ super.tearDown();
+ }
+
+ //
https://jira.jboss.org/browse/HORNETQ-525
+ public void testConcurrentClose() throws Exception
+ {
+ final Connection con = cf.createConnection();
+
+ for (int j = 0; j < 100; j++)
+ {
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ int threadCount = 10;
+
+ ThreadGroup group = new ThreadGroup("Test");
+
+ Thread[] threads = new Thread[threadCount];
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ threads[i] = new Thread(group, "thread " + i)
+ {
+ public void run()
+ {
+ try
+ {
+ con.start();
+
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ failed.set(true);
+ }
+
+ };
+ };
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ threads[i].join();
+ }
+
+ assertFalse(failed.get());
+ }
+
+ jmsServer.stop();
+ }
+
+}