Author: clebert.suconic(a)jboss.com
Date: 2011-04-15 01:17:26 -0400 (Fri, 15 Apr 2011)
New Revision: 10514
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
JBPAPP-6314 and JBPAPP-6316 - Client ping timeout adding TTL and deleting temporary queues
if session can't be reconnected
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -1093,6 +1093,8 @@
0,
clientFailureCheckPeriod,
TimeUnit.MILLISECONDS);
+ // To make sure the first ping will be sent
+ pingRunnable.send();
}
// send a ping every time we create a new remoting connection
// to set up its TTL on the server side
@@ -1313,8 +1315,8 @@
first = false;
long now = System.currentTimeMillis();
-
- if (clientFailureCheckPeriod != -1 && now >= lastCheck +
clientFailureCheckPeriod)
+
+ if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now
>= lastCheck + connectionTTL )
{
if (!connection.checkDataReceived())
{
@@ -1340,6 +1342,14 @@
}
}
+ send();
+ }
+
+ /**
+ *
+ */
+ public void send()
+ {
// Send a ping
Ping ping = new Ping(connectionTTL);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -1606,6 +1606,16 @@
{
return remotingConnection;
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (Map.Entry<String, String> entry : metadata.entrySet())
+ {
+ buffer.append(entry.getKey() + "=" + entry.getValue() +
",");
+ }
+ return "ClientSessionImpl::(" + buffer.toString() + ")";
+ }
// Protected
// ----------------------------------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -587,6 +587,20 @@
}
}
+ public void closeListeners()
+ {
+ List<CloseListener> listeners = remotingConnection.removeCloseListeners();
+
+ for (CloseListener closeListener: listeners)
+ {
+ closeListener.connectionClosed();
+ if (closeListener instanceof FailureListener)
+ {
+ remotingConnection.removeFailureListener((FailureListener)closeListener);
+ }
+ }
+ }
+
public int transferConnection(final CoreRemotingConnection newConnection, final int
lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -235,17 +235,15 @@
response = new ReattachSessionResponseMessage(-1, false);
}
+ log.debug("Reattaching request from " +
connection.getRemoteAddress());
+
+
ServerSessionPacketHandler sessionHandler =
protocolManager.getSessionHandler(request.getName());
- if (!server.checkActivate())
+ if (!server.checkActivate() || sessionHandler == null)
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
- if (sessionHandler == null)
- {
- response = new ReattachSessionResponseMessage(-1, false);
- }
else
{
if (sessionHandler.getChannel().getConfirmationWindowSize() == -1)
@@ -253,7 +251,10 @@
// Even though session exists, we can't reattach since confi window
size == -1,
// i.e. we don't have a resend cache for commands, so we just close
the old session
// and let the client recreate
+
+ log.warn("Reattach request from " +
connection.getRemoteAddress() + " failed as there is no confirmationWindowSize
configured, which may be ok for your system");
+ sessionHandler.closeListeners();
sessionHandler.close();
response = new ReattachSessionResponseMessage(-1, false);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -447,6 +447,11 @@
{
run();
}
+
+ public String toString()
+ {
+ return "Temporary Cleaner for queue " + bindingName;
+ }
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -114,7 +114,6 @@
for (int i = 0 ; i < 2000; i++)
{
- System.out.println("i = " + i);
if (i % 100 == 0)
{
System.out.println(i);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-04-15
02:35:06 UTC (rev 10513)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-04-15
05:17:26 UTC (rev 10514)
@@ -23,7 +23,13 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
@@ -31,7 +37,6 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -43,6 +48,7 @@
* A TemporaryQueueTest
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
*/
public class TemporaryQueueTest extends ServiceTestBase
{
@@ -288,7 +294,7 @@
session.close();
}
- public void _testQueueWithWildcard3() throws Exception
+ public void testQueueWithWildcard3() throws Exception
{
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
@@ -323,7 +329,41 @@
session2.close();
}
+
+ public void testRecreateConsumerOverServerFailure() throws Exception
+ {
+ ServerLocator serverWithReattach = createLocator();
+ serverWithReattach.setReconnectAttempts(-1);
+ serverWithReattach.setRetryInterval(1000);
+ serverWithReattach.setConfirmationWindowSize(-1);
+ ClientSessionFactory reattachSF = serverWithReattach.createSessionFactory();
+
+ ClientSession session = reattachSF.createSession(false, false);
+ session.createTemporaryQueue("tmpAd", "tmpQ");
+ ClientConsumer consumer = session.createConsumer("tmpQ");
+
+ ClientProducer prod = session.createProducer("tmpAd");
+
+ session.start();
+
+ RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+ conn.fail(new HornetQException(HornetQException.IO_ERROR));
+
+ prod.send(session.createMessage(false));
+ session.commit();
+
+ assertNotNull(consumer.receive(1000));
+
+ session.close();
+
+ reattachSF.close();
+
+ serverWithReattach.close();
+
+
+ }
+
public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
{
session.close();
@@ -416,12 +456,18 @@
server = createServer(false, configuration);
server.start();
- locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- locator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+ locator = createLocator();
sf = locator.createSessionFactory();
session = sf.createSession(false, true, true);
}
+ protected ServerLocator createLocator()
+ {
+ ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+ return retlocator;
+ }
+
@Override
protected void tearDown() throws Exception
{