[hornetq-commits] JBoss hornetq SVN: r11191 - in branches/Branch_2_2_EAP: src/main/org/hornetq/utils and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 11 15:58:06 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-11 15:58:06 -0400 (Thu, 11 Aug 2011)
New Revision: 11191

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java
Log:
https://issues.jboss.org/browse/HORNETQ-746 - Fixing a deadlock with Netty NIO

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-08-11 17:09:49 UTC (rev 11190)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-08-11 19:58:06 UTC (rev 11191)
@@ -93,8 +93,6 @@
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
 
-   private boolean largeMessageInDelivery;
-
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
@@ -235,7 +233,7 @@
          
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageInDelivery)
+         if (largeMessageDeliverer != null)
          {
             return HandleStatus.BUSY;
          }
@@ -463,21 +461,6 @@
       synchronized (lock)
       {
          this.transferring = transferring;
-
-         if (transferring)
-         {
-            // Now we must wait for any large message delivery to finish
-            while (largeMessageInDelivery)
-            {
-               try
-               {
-                  Thread.sleep(1);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-            }
-         }
       }
 
       // Outside the lock
@@ -662,28 +645,30 @@
 
    private void promptDelivery()
    {
-      synchronized (lock)
+      // largeMessageDeliverer is aways set inside a lock
+      // if we don't acquire a lock, we will have NPE eventually
+      if (largeMessageDeliverer != null)
       {
-         // largeMessageDeliverer is aways set inside a lock
-         // if we don't acquire a lock, we will have NPE eventually
-         if (largeMessageDeliverer != null)
-         {
-            resumeLargeMessage();
-         }
-         else
-         {
-            if (browseOnly)
-            {
-               messageQueue.getExecutor().execute(browserDeliverer);
-            }
-            else
-            {
-               messageQueue.forceDelivery();
-            }
-         }
+         resumeLargeMessage();
       }
+      else
+      {
+         forceDelivery();
+      }
    }
 
+   private void forceDelivery()
+   {
+      if (browseOnly)
+      {
+         messageQueue.getExecutor().execute(browserDeliverer);
+      }
+      else
+      {
+         messageQueue.deliverAsync();
+      }
+   }
+
    private void resumeLargeMessage()
    {
       messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -691,8 +676,6 @@
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
    {
-      largeMessageInDelivery = true;
-
       final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
       // it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -714,6 +697,7 @@
       }
    }
 
+
    // Inner classes
    // ------------------------------------------------------------------------
 
@@ -727,16 +711,7 @@
             {
                if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
                {
-                  if (browseOnly)
-                  {
-                     messageQueue.getExecutor().execute(browserDeliverer);
-                  }
-                  else
-                  {
-                     // prompt Delivery only if chunk was finished
-
-                     messageQueue.deliverAsync();
-                  }
+                  forceDelivery();
                }
             }
             catch (Exception e)
@@ -901,8 +876,6 @@
 
             largeMessageDeliverer = null;
 
-            largeMessageInDelivery = false;
-
             largeMessage = null;
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java	2011-08-11 17:09:49 UTC (rev 11190)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java	2011-08-11 19:58:06 UTC (rev 11191)
@@ -73,7 +73,7 @@
          {
             return Integer.valueOf((String)prop);
          }
-         else if (prop instanceof Integer == false)
+         else if (prop instanceof Number == false)
          {
             ConfigurationHelper.log.warn("Property " + propName +
                                          " must be an Integer, it is " +
@@ -83,7 +83,7 @@
          }
          else
          {
-            return (Integer)prop;
+            return ((Number)prop).intValue();
          }
       }
    }
@@ -108,7 +108,7 @@
          {
             return Long.valueOf((String)prop);
          }
-         else if (prop instanceof Long == false)
+         else if (prop instanceof Number == false)
          {
             ConfigurationHelper.log.warn("Property " + propName +
                                          " must be an Long, it is " +
@@ -118,7 +118,7 @@
          }
          else
          {
-            return (Long)prop;
+            return ((Number)prop).longValue();
          }
       }
    }

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java	2011-08-11 19:58:06 UTC (rev 11191)
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * -- https://issues.jboss.org/browse/HORNETQ-746
+ * Stress test using netty with NIO and many JMS clients concurrently, to try
+ * and induce a deadlock.
+ * <p>
+ * A large number of JMS clients are started concurrently. Some produce to queue
+ * 1 over one connection, others consume from queue 1 and produce to queue 2
+ * over a second connection, and others consume from queue 2 over a third
+ * connection.
+ * <p>
+ * Each operation is done in a JMS transaction, sending/consuming one message
+ * per transaction.
+ * <p>
+ * The server is set up with netty, with only one NIO worker and 1 hornetq
+ * server worker. This increases the chance for the deadlock to occur.
+ * <p>
+ * If the deadlock occurs, all threads will block/die. A simple transaction
+ * counting strategy is used to verify that the count has reached the expected
+ * value.
+ * @author Carl Heymann
+ */
+public class JmsNettyNioStressTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Remove this method to re-enable those tests
+   public void testStressSendNetty() throws Exception
+   {
+      doTestStressSend(true);
+   }
+
+   public void doTestStressSend(final boolean netty) throws Exception
+   {
+      // first set up the server
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PORT_PROP_NAME, 5445);
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.USE_NIO_PROP_NAME, true);
+      // minimize threads to maximize possibility for deadlock
+      params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 1);
+      params.put(TransportConstants.BATCH_DELAY, 50);
+      Configuration config = UnitTestCase.createDefaultConfig(params, ServiceTestBase.NETTY_ACCEPTOR_FACTORY);
+      HornetQServer server = createServer(true, config);
+      server.getConfiguration().setThreadPoolMaxSize(2);
+      server.start();
+
+      // now the client side
+      Map<String, Object> connectionParams = new HashMap<String, Object>();
+      connectionParams.put(TransportConstants.PORT_PROP_NAME, 5445);
+      connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+      connectionParams.put(TransportConstants.BATCH_DELAY, 50);
+      connectionParams.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 6);
+      final TransportConfiguration transpConf = new TransportConfiguration(NettyConnectorFactory.class.getName(),
+                                                                           connectionParams);
+      final ServerLocator locator = createNonHALocator(netty);
+
+      // each thread will do this number of transactions
+      final int numberOfMessages = 100;
+
+      // these must all be the same
+      final int numProducers = 30;
+      final int numConsumerProducers = 30;
+      final int numConsumers = 30;
+
+      // each produce, consume+produce and consume increments this counter
+      final AtomicInteger totalCount = new AtomicInteger(0);
+
+      // the total we expect if all producers, consumer-producers and
+      // consumers complete normally
+      int totalExpectedCount = (numProducers + numConsumerProducers + numConsumerProducers) * numberOfMessages;
+
+      // each group gets a separate connection
+      final Connection connectionProducer;
+      final Connection connectionConsumerProducer;
+      final Connection connectionConsumer;
+
+      // create the 2 queues used in the test
+      ClientSessionFactory sf = locator.createSessionFactory(transpConf);
+      ClientSession session = sf.createTransactedSession();
+      session.createQueue("jms.queue.queue", "jms.queue.queue");
+      session.createQueue("jms.queue.queue2", "jms.queue.queue2");
+      session.commit();
+      sf.close();
+      session.close();
+      locator.close();
+
+      // create and start JMS connections
+      HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transpConf);
+      connectionProducer = cf.createConnection();
+      connectionProducer.start();
+
+      connectionConsumerProducer = cf.createConnection();
+      connectionConsumerProducer.start();
+
+      connectionConsumer = cf.createConnection();
+      connectionConsumer.start();
+
+      // these threads produce messages on the the first queue
+      for (int i = 0; i < numProducers; i++)
+      {
+         new Thread()
+         {
+            @Override
+            public void run()
+            {
+
+               Session session = null;
+               try
+               {
+                  session = connectionProducer.createSession(true, Session.SESSION_TRANSACTED);
+                  MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue"));
+                  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     BytesMessage message = session.createBytesMessage();
+                     message.writeBytes(new byte[3000]);
+                     message.setStringProperty("Service", "LoadShedService");
+                     message.setStringProperty("Action", "testAction");
+
+                     messageProducer.send(message);
+                     session.commit();
+
+                     totalCount.incrementAndGet();
+                  }
+               }
+               catch (Exception e)
+               {
+                  throw new RuntimeException(e);
+               }
+               finally
+               {
+                  if (session != null)
+                  {
+                     try
+                     {
+                        session.close();
+                     }
+                     catch (Exception e)
+                     {
+                        e.printStackTrace();
+                     }
+                  }
+               }
+            }
+         }.start();
+      }
+
+      // these threads just consume from the one and produce on a second queue
+      for (int i = 0; i < numConsumerProducers; i++)
+      {
+         new Thread()
+         {
+            @Override
+            public void run()
+            {
+               Session session = null;
+               try
+               {
+                  session = connectionConsumerProducer.createSession(true, Session.SESSION_TRANSACTED);
+                  MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue"));
+                  MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue2"));
+                  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     BytesMessage message = (BytesMessage)consumer.receive(5000);
+                     if (message == null)
+                     {
+                        return;
+                     }
+                     message = session.createBytesMessage();
+                     message.writeBytes(new byte[3000]);
+                     message.setStringProperty("Service", "LoadShedService");
+                     message.setStringProperty("Action", "testAction");
+                     messageProducer.send(message);
+                     session.commit();
+
+                     totalCount.incrementAndGet();
+                  }
+               }
+               catch (Exception e)
+               {
+                  throw new RuntimeException(e);
+               }
+               finally
+               {
+                  if (session != null)
+                  {
+                     try
+                     {
+                        session.close();
+                     }
+                     catch (Exception e)
+                     {
+                        e.printStackTrace();
+                     }
+                  }
+               }
+            }
+         }.start();
+      }
+
+      // these threads consume from the second queue
+      for (int i = 0; i < numConsumers; i++)
+      {
+         new Thread()
+         {
+            @Override
+            public void run()
+            {
+               Session session = null;
+               try
+               {
+                  session = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED);
+                  MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue2"));
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     BytesMessage message = (BytesMessage)consumer.receive(5000);
+                     if (message == null)
+                     {
+                        return;
+                     }
+                     session.commit();
+
+                     totalCount.incrementAndGet();
+                  }
+               }
+               catch (Exception e)
+               {
+                  throw new RuntimeException(e);
+               }
+               finally
+               {
+                  if (session != null)
+                  {
+                     try
+                     {
+                        session.close();
+                     }
+                     catch (Exception e)
+                     {
+                        e.printStackTrace();
+                     }
+                  }
+               }
+            }
+         }.start();
+      }
+
+      // check that the overall transaction count reaches the expected number,
+      // which would indicate that the system didn't stall
+      int timeoutCounter = 0;
+      int maxSecondsToWait = 60;
+      while (timeoutCounter < maxSecondsToWait && totalCount.get() < totalExpectedCount)
+      {
+         timeoutCounter++;
+         Thread.sleep(1000);
+         System.out.println("Not done yet.. " + (maxSecondsToWait - timeoutCounter) + "; " + totalCount.get());
+      }
+      System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount);
+      Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get());
+      System.out.println("After assert");
+
+      // attempt cleaning up (this is not in a finally, still needs some work)
+      connectionProducer.close();
+      connectionConsumerProducer.close();
+      connectionConsumer.close();
+
+      server.stop();
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file



More information about the hornetq-commits mailing list