[hornetq-commits] JBoss hornetq SVN: r9725 - in trunk: tests/src/org/hornetq/tests/integration/jms/connection and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 28 05:33:31 EDT 2010


Author: timfox
Date: 2010-09-28 05:33:31 -0400 (Tue, 28 Sep 2010)
New Revision: 9725

Added:
   trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
Modified:
   trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
   trunk/src/main/org/hornetq/jms/client/HornetQSession.java
Log:
https://jira.jboss.org/browse/HORNETQ-525

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2010-09-27 12:27:02 UTC (rev 9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2010-09-28 09:33:31 UTC (rev 9725)
@@ -216,7 +216,7 @@
       justCreated = false;
    }
 
-   public void start() throws JMSException
+   public synchronized void start() throws JMSException
    {
       checkClosed();
 
@@ -229,7 +229,7 @@
       started = true;
    }
 
-   public void stop() throws JMSException
+   public synchronized void stop() throws JMSException
    {
       checkClosed();
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2010-09-27 12:27:02 UTC (rev 9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2010-09-28 09:33:31 UTC (rev 9725)
@@ -111,11 +111,11 @@
    // Constructors --------------------------------------------------
 
    protected HornetQSession(final HornetQConnection connection,
-                         final boolean transacted,
-                         final boolean xa,
-                         final int ackMode,
-                         final ClientSession session,
-                         final int sessionType)
+                            final boolean transacted,
+                            final boolean xa,
+                            final int ackMode,
+                            final ClientSession session,
+                            final int sessionType)
    {
       this.connection = connection;
 
@@ -213,7 +213,7 @@
 
       return ackMode;
    }
-   
+
    public boolean isXA()
    {
       return xa;
@@ -262,21 +262,24 @@
 
    public void close() throws JMSException
    {
-      try
+      synchronized (connection)
       {
-         for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers))
+         try
          {
-            cons.close();
-         }
+            for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers))
+            {
+               cons.close();
+            }
 
-         session.close();
+            session.close();
 
-         connection.removeSession(this);
+            connection.removeSession(this);
+         }
+         catch (HornetQException e)
+         {
+            throw JMSExceptionHelper.convertFromHornetQException(e);
+         }
       }
-      catch (HornetQException e)
-      {
-         throw JMSExceptionHelper.convertFromHornetQException(e);
-      }
    }
 
    public void recover() throws JMSException
@@ -393,7 +396,7 @@
       try
       {
          HornetQQueue queue = lookupQueue(queueName, false);
-         
+
          if (queue == null)
          {
             queue = lookupQueue(queueName, true);
@@ -413,7 +416,6 @@
          throw JMSExceptionHelper.convertFromHornetQException(e);
       }
    }
-   
 
    public Topic createTopic(final String topicName) throws JMSException
    {
@@ -423,7 +425,6 @@
          throw new IllegalStateException("Cannot create a topic on a QueueSession");
       }
 
-      
       try
       {
          HornetQTopic topic = lookupTopic(topicName, false);
@@ -477,7 +478,7 @@
       }
 
       HornetQDestination jbdest = (HornetQDestination)topic;
-      
+
       if (jbdest.isQueue())
       {
          throw new InvalidDestinationException("Cannot create a subscriber on a queue");
@@ -490,7 +491,7 @@
                                                  final String subscriptionName,
                                                  String selectorString,
                                                  final boolean noLocal) throws JMSException
-   {     
+   {
       try
       {
          selectorString = "".equals(selectorString) ? null : selectorString;
@@ -525,7 +526,7 @@
          SimpleString autoDeleteQueueName = null;
 
          if (dest.isQueue())
-         {            
+         {
             BindingQuery response = session.bindingQuery(dest.getSimpleAddress());
 
             if (!response.isExists())
@@ -573,7 +574,7 @@
                }
 
                queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
-                                                                                               subscriptionName));
+                                                                                                     subscriptionName));
 
                QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -678,10 +679,10 @@
       }
 
       HornetQDestination jbq = (HornetQDestination)queue;
-      
+
       if (!jbq.isQueue())
       {
-         throw new InvalidDestinationException("Cannot create a browser on a topic");  
+         throw new InvalidDestinationException("Cannot create a browser on a topic");
       }
 
       try
@@ -767,7 +768,7 @@
       }
 
       SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
-                                                                                                   name));
+                                                                                                         name));
 
       try
       {
@@ -887,7 +888,7 @@
       {
          throw new InvalidDestinationException("Not a temporary topic " + tempTopic);
       }
-      
+
       try
       {
          BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
@@ -949,7 +950,7 @@
          throw JMSExceptionHelper.convertFromHornetQException(e);
       }
    }
-   
+
    public void start() throws JMSException
    {
       try
@@ -991,11 +992,11 @@
          }
          catch (HornetQException ignore)
          {
-            //Exception on deleting queue shouldn't prevent close from completing
+            // Exception on deleting queue shouldn't prevent close from completing
          }
       }
    }
-   
+
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
@@ -1007,11 +1008,11 @@
          throw new IllegalStateException("Session is closed");
       }
    }
-   
+
    private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws HornetQException
    {
       HornetQQueue queue;
-      
+
       if (isTemporary)
       {
          queue = HornetQDestination.createTemporaryQueue(queueName);
@@ -1020,7 +1021,7 @@
       {
          queue = HornetQDestination.createQueue(queueName);
       }
-      
+
       QueueQuery response = session.queueQuery(queue.getSimpleAddress());
 
       if (response.isExists())
@@ -1032,12 +1033,12 @@
          return null;
       }
    }
-   
+
    private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary) throws HornetQException
    {
 
       HornetQTopic topic;
-      
+
       if (isTemporary)
       {
          topic = HornetQDestination.createTemporaryTopic(topicName);
@@ -1046,7 +1047,7 @@
       {
          topic = HornetQDestination.createTopic(topicName);
       }
-      
+
       BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
 
       if (!query.isExists())
@@ -1059,7 +1060,6 @@
       }
    }
 
-
    // Inner classes -------------------------------------------------
 
 }

Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java	2010-09-28 09:33:31 UTC (rev 9725)
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * 
+ * A ConcurrentSessionCloseTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ConcurrentSessionCloseTest extends JMSTestBase
+{
+   private static final Logger log = Logger.getLogger(ConcurrentSessionCloseTest.class);
+
+   private HornetQConnectionFactory cf;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      cf = null;
+
+      super.tearDown();
+   }
+
+   // https://jira.jboss.org/browse/HORNETQ-525
+   public void testConcurrentClose() throws Exception
+   {
+      final Connection con = cf.createConnection();
+
+      for (int j = 0; j < 100; j++)
+      {
+         final AtomicBoolean failed = new AtomicBoolean(false);
+
+         int threadCount = 10;
+
+         ThreadGroup group = new ThreadGroup("Test");
+
+         Thread[] threads = new Thread[threadCount];
+
+         for (int i = 0; i < threadCount; i++)
+         {
+            threads[i] = new Thread(group, "thread " + i)
+            {
+               public void run()
+               {
+                  try
+                  {
+                     con.start();
+                     
+                     Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                     session.close();
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+
+                     failed.set(true);
+                  }
+
+               };
+            };
+            threads[i].start();
+         }
+
+         for (int i = 0; i < threadCount; i++)
+         {
+            threads[i].join();
+         }
+
+         assertFalse(failed.get());
+      }
+
+      jmsServer.stop();
+   }
+
+}



More information about the hornetq-commits mailing list