[hornetq-commits] JBoss hornetq SVN: r8015 - in trunk: src/main/org/hornetq/core/remoting/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Sep 30 13:44:47 EDT 2009
Author: timfox
Date: 2009-09-30 13:44:47 -0400 (Wed, 30 Sep 2009)
New Revision: 8015
Added:
trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-144
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -275,7 +275,7 @@
{
printedDropMessagesWarning = true;
- log.warn("Messages are being dropped on adress " + getStoreName());
+ log.warn("Messages are being dropped on address " + getStoreName());
}
// Address is full, we just pretend we are paging, and drop the data
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -198,7 +198,7 @@
// Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
// packet is sent to assure we get some credits back
if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
- {
+ {
try
{
sendSemaphore.acquire(size);
@@ -343,6 +343,12 @@
{
return;
}
+
+ if (sendSemaphore != null)
+ {
+ //Any threads blocking on the send semaphore should be allowed to return
+ sendSemaphore.release(Integer.MAX_VALUE);
+ }
if (!connection.isDestroyed() && !connection.removeChannel(id))
{
Added: trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -0,0 +1,120 @@
+package org.hornetq.tests.integration.client;
+
+import junit.framework.TestCase;
+
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SendAcknowledgementHandler;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ *
+ * From https://jira.jboss.org/jira/browse/HORNETQ-144
+ *
+ */
+public class HornetQCrashTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(HornetQCrashTest.class);
+
+ public HornetQServer server;
+
+ private volatile boolean ackReceived;
+
+ public void testHang() throws Exception
+ {
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setPersistenceEnabled(false);
+ configuration.setSecurityEnabled(false);
+ configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+
+ server = HornetQ.newHornetQServer(configuration);
+
+ server.start();
+
+ server.getRemotingService().addInterceptor(new AckInterceptor(server));
+
+ ClientSessionFactory clientSessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+ // Force an ack at once - this means the send call will block
+ clientSessionFactory.setProducerWindowSize(1);
+
+ ClientSession session = clientSessionFactory.createSession();
+
+ session.setSendAcknowledgementHandler(new SendAcknowledgementHandler()
+ {
+ public void sendAcknowledged(Message message)
+ {
+ ackReceived = true;
+ }
+ });
+
+ ClientProducer producer = session.createProducer("fooQueue");
+
+ ClientMessage msg = session.createClientMessage(false);
+
+ msg.putStringProperty("someKey", "someValue");
+
+ producer.send(msg);
+
+ Thread.sleep(250);
+
+ assertFalse(ackReceived);
+ }
+
+ public static class AckInterceptor implements Interceptor
+ {
+ private HornetQServer server;
+
+ AckInterceptor(HornetQServer server)
+ {
+ this.server = server;
+ }
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ log.info("AckInterceptor.intercept " + packet);
+
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ try
+ {
+ log.info("Stopping server");
+
+ // Stop the server when a message arrives, to simulate a crash
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ server = null;
+ }
+}
More information about the hornetq-commits
mailing list