[hornetq-commits] JBoss hornetq SVN: r8015 - in trunk: src/main/org/hornetq/core/remoting/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 30 13:44:47 EDT 2009


Author: timfox
Date: 2009-09-30 13:44:47 -0400 (Wed, 30 Sep 2009)
New Revision: 8015

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
Modified:
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-144

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-09-30 17:44:47 UTC (rev 8015)
@@ -275,7 +275,7 @@
             {
                printedDropMessagesWarning = true;
 
-               log.warn("Messages are being dropped on adress " + getStoreName());
+               log.warn("Messages are being dropped on address " + getStoreName());
             }
 
             // Address is full, we just pretend we are paging, and drop the data

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-09-30 17:44:47 UTC (rev 8015)
@@ -198,7 +198,7 @@
          // Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
          // packet is sent to assure we get some credits back
          if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
-         {
+         {            
             try
             {
                sendSemaphore.acquire(size);
@@ -343,6 +343,12 @@
       {
          return;
       }
+      
+      if (sendSemaphore != null)
+      {
+         //Any threads blocking on the send semaphore should be allowed to return
+         sendSemaphore.release(Integer.MAX_VALUE);
+      }
 
       if (!connection.isDestroyed() && !connection.removeChannel(id))
       {

Added: trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java	2009-09-30 17:44:47 UTC (rev 8015)
@@ -0,0 +1,120 @@
+package org.hornetq.tests.integration.client;
+
+import junit.framework.TestCase;
+
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SendAcknowledgementHandler;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * 
+ * From https://jira.jboss.org/jira/browse/HORNETQ-144
+ * 
+ */
+public class HornetQCrashTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(HornetQCrashTest.class);
+
+   public HornetQServer server;
+   
+   private volatile boolean ackReceived;
+   
+   public void testHang() throws Exception
+   {
+      Configuration configuration = new ConfigurationImpl();
+      configuration.setPersistenceEnabled(false);
+      configuration.setSecurityEnabled(false);
+      configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      
+      server = HornetQ.newHornetQServer(configuration);
+
+      server.start();
+      
+      server.getRemotingService().addInterceptor(new AckInterceptor(server));
+
+      ClientSessionFactory clientSessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+      // Force an ack at once - this means the send call will block
+      clientSessionFactory.setProducerWindowSize(1);
+
+      ClientSession session = clientSessionFactory.createSession();
+      
+      session.setSendAcknowledgementHandler(new SendAcknowledgementHandler()
+      {
+         public void sendAcknowledged(Message message)
+         {
+            ackReceived = true;
+         }
+      });
+
+      ClientProducer producer = session.createProducer("fooQueue");
+
+      ClientMessage msg = session.createClientMessage(false);
+
+      msg.putStringProperty("someKey", "someValue");
+
+      producer.send(msg);
+      
+      Thread.sleep(250);
+
+      assertFalse(ackReceived);
+   }
+
+   public static class AckInterceptor implements Interceptor
+   {
+      private HornetQServer server;
+      
+      AckInterceptor(HornetQServer server)
+      {
+         this.server = server;
+      }
+      
+      public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+      {
+         log.info("AckInterceptor.intercept " + packet);
+         
+         if (packet.getType() == PacketImpl.SESS_SEND)
+         {
+            try
+            {
+               log.info("Stopping server");
+               
+               // Stop the server when a message arrives, to simulate a crash               
+               server.stop();                             
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+
+            return false;
+         }
+         return true;
+      }
+
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      server = null;
+   }
+}



More information about the hornetq-commits mailing list