[jboss-cvs] JBoss Messaging SVN: r4282 - in trunk: src/main/org/jboss/messaging/core/client and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 22 08:29:03 EDT 2008
Author: ataylor
Date: 2008-05-22 08:29:03 -0400 (Thu, 22 May 2008)
New Revision: 4282
Added:
trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java
trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java
Modified:
trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java
Log:
added keepalive functionality
Modified: trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -40,6 +40,7 @@
public static void main(String[] args)
{
Connection connection = null;
+ Connection connection2 = null;
try
{
//create an initial context, env will be picked up from jndi.properties
@@ -47,6 +48,7 @@
Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
+ connection2 = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("This is a text message!");
@@ -57,6 +59,14 @@
TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
log.info("message received from queue");
log.info("message = " + message2.getText());
+ try
+ {
+ Thread.sleep(200000);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
}
catch (NamingException e)
{
@@ -72,6 +82,7 @@
try
{
connection.close();
+ connection2.close();
}
catch (JMSException e)
{
Added: trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,8 @@
+package org.jboss.messaging.core.client;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ServerPonger extends Runnable
+{
+}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -25,6 +25,9 @@
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
@@ -83,6 +86,18 @@
log.trace(this + " created with configuration " + location);
}
+ public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
+ {
+ assert location != null;
+ assert connectionParams != null;
+
+ this.location = location;
+ this.connectionParams = connectionParams;
+ connector = nioConnector;
+ session = connector.connect();
+ log.trace(this + " created with connector " + nioConnector);
+ }
+
// Public ---------------------------------------------------------------------------------------
// RemotingConnection implementation ------------------------------------------------------------
@@ -99,7 +114,8 @@
log.trace(this + " started");
}
-
+
+
public void stop()
{
log.trace(this + " stop");
@@ -124,7 +140,7 @@
log.trace(this + " closed");
}
-
+
public long getSessionID()
{
if (session == null || !session.isConnected())
Added: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.client.ServerPonger;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.apache.mina.common.IoSession;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerPongerImpl implements PacketHandler, ServerPonger
+{
+ private static Logger log = Logger.getLogger(ServerPongerImpl.class);
+ private static boolean traceEnabled = log.isTraceEnabled();
+ IoSession session;
+ private long id;
+ long interval;
+ long timeout;
+ CleanUpNotifier cleanUpNotifier;
+ KeepAliveHandler keepAliveHandler;
+ CountDownLatch latch = new CountDownLatch(1);
+
+ public ServerPongerImpl(CleanUpNotifier cleanUpNotifier, KeepAliveHandler keepAliveHandler, IoSession session, long id, long timeout, long interval)
+ {
+ this.cleanUpNotifier = cleanUpNotifier;
+ this.keepAliveHandler = keepAliveHandler;
+ this.session = session;
+ this.id = id;
+ this.timeout = timeout;
+ this.interval = interval;
+ }
+
+
+ public void run()
+ {
+ boolean pinged;
+ latch = new CountDownLatch(1);
+ try
+ {
+ pinged = latch.await(timeout + interval, TimeUnit.MILLISECONDS);
+ if(!pinged)
+ {
+ log.warn("no ping received from server, cleaning up connection.");
+ cleanUpNotifier.fireCleanup(session.getId(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "no ping received from server"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public void handle(Packet packet, PacketReturner sender)
+ {
+ Ping ping = (Ping) packet;
+ latch.countDown();
+ if(traceEnabled)
+ {
+ log.trace("received ping:" + ping);
+ }
+ Pong pong = keepAliveHandler.ping(ping);
+ if(pong != null)
+ {
+ session.write(pong);
+ }
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,26 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public interface KeepAliveFactory
-{
-
- Ping ping(long sessionID);
-
- Pong pong(long sessionID, Ping ping);
-
- boolean isPing(long sessionID, Object message);
-}
Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,14 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * pluggable component that defines how a client responds to a server ping command
+ *
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface KeepAliveHandler
+{
+ Pong ping(Ping pong);
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -9,6 +9,8 @@
import org.jboss.messaging.core.client.RemotingSessionListener;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.exception.MessagingException;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -29,4 +31,6 @@
void addRemotingSessionListener(RemotingSessionListener listener);
void removeRemotingSessionListener(RemotingSessionListener listener);
+
+ void setClientPinger(ClientPinger clientPinger);
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.KeepAliveHandler;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * pluggable component that defines how a client responds to a server ping command. This simple implementation returns a
+ * valid pong
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ClientKeepAliveHandler implements KeepAliveHandler
+{
+
+ public Pong ping(Ping ping)
+ {
+ Pong pong = new Pong(ping.getSessionID(), false);
+ pong.setTargetID(0);
+ return pong;
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class ClientKeepAliveFactory implements KeepAliveFactory
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // KeepAliveFactory implementation -------------------------------
-
- public Ping ping(long clientSessionID)
- {
- return new Ping(clientSessionID);
- }
-
- public boolean isPing(long sessionID, Object message)
- {
- return (message instanceof Ping);
- }
-
- public Pong pong(long sessionID, Ping ping)
- {
- return new Pong(sessionID, false);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -16,7 +16,6 @@
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
/**
@@ -46,29 +45,7 @@
filterChain.addLast("codec", new ProtocolCodecFilter(new MessagingCodec()));
}
-
- public static void addKeepAliveFilter(final DefaultIoFilterChainBuilder filterChain,
- final KeepAliveFactory factory, final int keepAliveInterval,
- final int keepAliveTimeout, final CleanUpNotifier notifier)
- {
- assert filterChain != null;
- assert factory != null;
- assert notifier != null;
-
- if (keepAliveTimeout > keepAliveInterval)
- {
- throw new IllegalArgumentException("timeout must be greater than the interval: "
- + "keepAliveTimeout= " + keepAliveTimeout
- + ", keepAliveInterval=" + keepAliveInterval);
- }
- KeepAliveFilter filter = new KeepAliveFilter(
- new MinaKeepAliveFactory(factory, notifier), BOTH_IDLE, EXCEPTION, keepAliveInterval,
- keepAliveTimeout);
- filter.setForwardEvent(true);
- filterChain.addLast("keep-alive", filter);
- }
-
public static void addSSLFilter(
final DefaultIoFilterChainBuilder filterChain, final boolean client,
final String keystorePath, final String keystorePassword, final String trustStorePath,
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -6,14 +6,14 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.ConnectFuture;
@@ -27,13 +27,13 @@
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.ServerPonger;
import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.impl.ServerPongerImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.NIOConnector;
-import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.impl.ClientKeepAliveHandler;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -68,6 +68,11 @@
private MinaHandler handler;
+ KeepAliveHandler keepAliveHandler;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -75,22 +80,22 @@
// Public --------------------------------------------------------
public MinaConnector(Location location, PacketDispatcher dispatcher)
{
- this(location, new ConnectionParamsImpl(), dispatcher, new ClientKeepAliveFactory());
+ this(location, new ConnectionParamsImpl(), dispatcher, new ClientKeepAliveHandler());
}
public MinaConnector(Location location, ConnectionParams connectionParams, PacketDispatcher dispatcher)
{
- this(location, connectionParams, dispatcher, new ClientKeepAliveFactory());
+ this(location, connectionParams, dispatcher, new ClientKeepAliveHandler());
}
public MinaConnector(Location location, PacketDispatcher dispatcher,
- KeepAliveFactory keepAliveFactory)
+ KeepAliveHandler keepAliveFactory)
{
this(location, new ConnectionParamsImpl(), dispatcher, keepAliveFactory);
}
public MinaConnector(Location location, ConnectionParams connectionParams, PacketDispatcher dispatcher,
- KeepAliveFactory keepAliveFactory)
+ KeepAliveHandler keepAliveFactory)
{
assert location != null;
assert dispatcher != null;
@@ -100,8 +105,8 @@
this.location = location;
this.connectionParams = connectionParams;
this.dispatcher = dispatcher;
-
- connector = new NioSocketConnector();
+ this.keepAliveHandler = keepAliveFactory;
+ this.connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
@@ -120,7 +125,7 @@
}
}
addCodecFilter(filterChain);
-// addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
+// addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
// connectionParams.getKeepAliveTimeout(), this);
connector.getSessionConfig().setTcpNoDelay(connectionParams.isTcpNoDelay());
int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
@@ -166,9 +171,15 @@
throw new IOException("Cannot connect to " + address.toString());
}
session = future.getSession();
-// Packet packet = new Ping(session.getId());
-// session.write(packet);
+ ServerPongerImpl pinger = new ServerPongerImpl(this, keepAliveHandler, session, 0, connectionParams.getKeepAliveTimeout() * 1000, connectionParams.getKeepAliveInterval() * 1000);
+
+ getDispatcher().register(pinger);
+ if (connectionParams.getKeepAliveInterval() > 0)
+ {
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+ scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
+ }
return new MinaSession(session, handler);
}
@@ -204,7 +215,10 @@
connector = null;
session = null;
-
+ if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
+ {
+ scheduledExecutor.shutdown();
+ }
return closed;
}
@@ -248,6 +262,10 @@
public synchronized void fireCleanup(long sessionID, MessagingException me)
{
+ if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
+ {
+ scheduledExecutor.shutdown();
+ }
for (RemotingSessionListener listener: listeners)
{
listener.sessionDestroyed(sessionID, me);
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,93 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class MinaKeepAliveFactory implements KeepAliveMessageFactory
-{
- // Constant ------------------------------------------------------
-
- private static final Logger log = Logger
- .getLogger(MinaKeepAliveFactory.class);
-
- // Attributes ----------------------------------------------------
-
- private KeepAliveFactory innerFactory;
-
- private CleanUpNotifier notifier;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public MinaKeepAliveFactory(KeepAliveFactory factory,
- CleanUpNotifier notifier)
- {
- assert factory != null;
-
- this.notifier = notifier;
- this.innerFactory = factory;
- }
-
- // Public --------------------------------------------------------
-
- // KeepAliveMessageFactory implementation ------------------------
-
- public Object getRequest(IoSession session)
- {
- return innerFactory.ping(session.getId());
- }
-
- public Object getResponse(IoSession session, Object request)
- {
- return innerFactory.pong(session.getId(), (Ping) request);
- }
-
- public boolean isRequest(IoSession session, Object request)
- {
- return innerFactory.isPing(session.getId(), request);
- }
-
- public boolean isResponse(IoSession session, Object response)
- {
- if (response instanceof Pong)
- {
- Pong pong = (Pong) response;
- if (pong.isSessionFailed() && notifier != null)
- {
- notifier.fireCleanup(session.getId(), new MessagingException(
- MessagingException.CONNECTION_TIMEDOUT,
- "Session has failed on the server"));
- }
- return true;
- } else
- {
- return false;
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -34,6 +34,8 @@
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.server.impl.ClientPingerImpl;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -61,11 +63,11 @@
private ExecutorService threadPool;
- private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+ private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
- private ServerKeepAliveFactory factory;
+ private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
- private final List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+ private ClientPinger clientPinger;
// Static --------------------------------------------------------
@@ -73,18 +75,11 @@
public MinaService(Configuration config)
{
- this(config, new ServerKeepAliveFactory());
- }
-
- public MinaService(Configuration config, ServerKeepAliveFactory factory)
- {
assert config != null;
- assert factory != null;
validate(config);
this.config = config;
- this.factory = factory;
dispatcher = new PacketDispatcherImpl(filters);
}
@@ -114,6 +109,12 @@
listeners.remove(listener);
}
+ public void setClientPinger(ClientPinger clientPinger)
+ {
+ this.clientPinger = clientPinger;
+ clientPinger.registerCleanUpNotifier(this);
+ }
+
// TransportService implementation -------------------------------
public void start() throws Exception
@@ -228,26 +229,15 @@
public void fireCleanup(long sessionID, MessagingException me)
{
- if (factory.getSessions().containsKey(sessionID))
+ for (RemotingSessionListener listener : listeners)
{
- long clientSessionID = factory.getSessions().containsKey(sessionID)?factory.getSessions().get(sessionID):0;
- for (RemotingSessionListener listener : listeners)
- {
- listener.sessionDestroyed(clientSessionID, me);
- }
- factory.getSessions().remove(sessionID);
+ listener.sessionDestroyed(sessionID, me);
}
}
// Public --------------------------------------------------------
- public void setKeepAliveFactory(ServerKeepAliveFactory factory)
- {
- assert factory != null;
- this.factory = factory;
- }
-
public void setRemotingConfiguration(Configuration remotingConfig)
{
assert started == false;
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,95 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class ServerKeepAliveFactory implements KeepAliveFactory
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger
- .getLogger(ServerKeepAliveFactory.class);
-
- // Attributes ----------------------------------------------------
-
- // FIXME session mapping must be cleaned when the server session is closed:
- // either normally or exceptionally
- /**
- * Key = server session ID Value = client session ID
- */
- private Map<Long, Long> sessions = new ConcurrentHashMap<Long, Long>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // KeepAliveFactory implementation -------------------------------
-
- public Ping ping(long sessionID)
- {
- return new Ping(sessionID);
- }
-
- public boolean isPing(long sessionID, Object message)
- {
- if (!(message instanceof Ping))
- {
- return false;
- }
- else
- {
- Ping ping = (Ping) message;
- long clientSessionID = ping.getSessionID();
- if (clientSessionID == sessionID)
- {
- return false;
- }
- else
- {
- if (log.isDebugEnabled())
- log.debug("associated server session " + sessionID
- + " to client " + clientSessionID);
- sessions.put(sessionID, clientSessionID);
- return true;
- }
- }
- }
-
- public Pong pong(long sessionID, Ping ping)
- {
- long clientSessionID = ping.getSessionID();
- return new Pong(sessionID, sessions.containsKey(clientSessionID));
- }
-
- public Map<Long, Long> getSessions()
- {
- return sessions;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,45 @@
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.PacketReturner;
+
+/**
+ * Used by a MessagingServer to detect that a client is still alive.
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ClientPinger extends Runnable
+{
+ /**
+ * this will be scheduled to run at the keep alive interval period
+ */
+ void run();
+
+ /**
+ * pong received from client
+ * @param pong the pong
+ */
+ void pong(Pong pong);
+
+ /**
+ * register a connection.
+ *
+ * @param remotingSessionID the session id
+ * @param sender the sender
+ */
+ void registerConnection(long remotingSessionID, PacketReturner sender);
+
+ /**
+ * unregister a connection.
+ *
+ * @param remotingSessionID the session id
+ */
+ void unregister(long remotingSessionID);
+
+ /**
+ * register the cleanup notifier to use
+ *
+ * @param cleanUpNotifier the notifier
+ */
+ void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier);
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -73,5 +73,7 @@
long getCreated();
+ long getRemotingClientSessionID();
+
Collection<ServerSession> getSessions();
}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.exception.MessagingException;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ClientPingerImpl implements ClientPinger
+{
+ private static Logger log = Logger.getLogger(ClientPingerImpl.class);
+
+ private static boolean isTraceEnabled = log.isTraceEnabled();
+ /**
+ * the current active connections
+ */
+ private Map<Long, ConnectionHolder> connections = new ConcurrentHashMap<Long, ConnectionHolder>();
+ /**
+ * holds connections we are waiting for for replies
+ */
+ List<Long> replies = new ArrayList<Long>();
+ /**
+ * the server
+ */
+ private MessagingServer server;
+ /**
+ * the cleanupnotifier to use on failed pings
+ */
+ private CleanUpNotifier cleanUpNotifier;
+
+ public ClientPingerImpl(MessagingServer server)
+ {
+ this.server = server;
+ }
+
+ public void run()
+ {
+ try
+ {
+ synchronized (this)
+ {
+ replies.clear();
+ //ping all the sessions
+ for (Long sessionId : connections.keySet())
+ {
+ try
+ {
+ Ping ping = new Ping(sessionId);
+ ping.setTargetID(0);
+ connections.get(sessionId).getPacketReturner().send(ping);
+ replies.add(sessionId);
+ if(isTraceEnabled)
+ {
+ log.trace("sending " + ping);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ //wait for the keep alive timeout period
+ try
+ {
+ wait(server.getConfiguration().getKeepAliveTimeout() * 1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ //at this point cleanup any replies we havent received
+ for (Long reply : replies)
+ {
+ if(cleanUpNotifier != null)
+ cleanUpNotifier.fireCleanup(reply, new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
+ connections.remove(reply);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * pong received from client
+ * @param pong
+ */
+ public void pong(Pong pong)
+ {
+ if(isTraceEnabled)
+ {
+ log.trace("received reply" + pong);
+ }
+ replies.remove(pong.getSessionID());
+ }
+
+ /**
+ * register a connection.
+ *
+ * @param remotingSessionID
+ * @param sender
+ */
+ public void registerConnection(long remotingSessionID, PacketReturner sender)
+ {
+ if (connections.get(remotingSessionID) == null)
+ {
+ connections.put(remotingSessionID, new ConnectionHolder(remotingSessionID, sender));
+ }
+ else
+ {
+ connections.get(remotingSessionID).increment();
+ }
+
+ }
+
+ /**
+ * unregister a connection.
+ *
+ * @param remotingSessionID
+ */
+ public void unregister(long remotingSessionID)
+ {
+ ConnectionHolder connectionHolder = connections.get(remotingSessionID);
+ if(connectionHolder != null)
+ {
+ connectionHolder.decrement();
+ if(connectionHolder.get() == 0)
+ {
+ connections.remove(remotingSessionID);
+ }
+ }
+ }
+
+ /**
+ * register the cleanup notifier to use
+ *
+ * @param cleanUpNotifier
+ */
+ public void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
+ {
+ this.cleanUpNotifier = cleanUpNotifier;
+ }
+
+ /**
+ * simple holder class for sessions
+ */
+ class ConnectionHolder
+ {
+ AtomicInteger connectionCount = new AtomicInteger(1);
+ Long sessionId;
+ PacketReturner packetReturner;
+
+ public ConnectionHolder(Long sessionId, PacketReturner packetReturner)
+ {
+ this.sessionId = sessionId;
+ this.packetReturner = packetReturner;
+ }
+
+ public Integer increment()
+ {
+ return connectionCount.getAndIncrement();
+ }
+
+ public Integer decrement()
+ {
+ return connectionCount.getAndDecrement();
+ }
+
+ public Integer get()
+ {
+ return connectionCount.get();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ConnectionHolder that = (ConnectionHolder) o;
+
+ if (!sessionId.equals(that.sessionId)) return false;
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ return sessionId.hashCode();
+ }
+
+ public long getSessionId()
+ {
+ return sessionId;
+ }
+
+ public PacketReturner getPacketReturner()
+ {
+ return packetReturner;
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -44,16 +44,14 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
-import org.jboss.messaging.core.server.ConnectionManager;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -98,6 +96,7 @@
private Deployer queueSettingsDeployer;
private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
private DeploymentManager deploymentManager = new FileDeploymentManager();
+ private ClientPinger clientPinger;
// plugins
@@ -112,6 +111,7 @@
private QueueFactory queueFactory;
private ResourceManager resourceManager = new ResourceManagerImpl(0);
private ScheduledExecutorService scheduledExecutor;
+ private MessagingServerPacketHandler serverPacketHandler;
// Constructors ---------------------------------------------------------------------------------
/**
@@ -172,6 +172,8 @@
}
// Start the wired components
securityDeployer.start();
+ clientPinger = new ClientPingerImpl(this);
+ remotingService.setClientPinger(clientPinger);
remotingService.addRemotingSessionListener(connectionManager);
memoryManager.start();
deploymentManager.start(1);
@@ -179,10 +181,9 @@
deploymentManager.registerDeployer(queueSettingsDeployer);
postOffice.start();
deploymentManager.start(2);
-
- MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
+ serverPacketHandler = new MessagingServerPacketHandler(this, clientPinger);
getRemotingService().getDispatcher().register(serverPacketHandler);
-
+ serverPacketHandler.start();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : configuration.getDefaultInterceptors())
{
@@ -216,6 +217,7 @@
queueSettingsDeployer.stop();
deploymentManager.stop();
remotingService.removeRemotingSessionListener(connectionManager);
+ serverPacketHandler.stop();
connectionManager = null;
memoryManager.stop();
memoryManager = null;
@@ -292,6 +294,7 @@
this.postOffice = postOffice;
}
+
public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
{
return securityRepository;
@@ -345,7 +348,7 @@
queueSettingsRepository,
postOffice, securityStore, connectionManager);
- remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
+ remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection, clientPinger));
return new CreateConnectionResponse(connection.getID(), version);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -27,10 +27,15 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.server.MessagingComponent;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* A packet handler for all packets that need to be handled at the server level
*
@@ -38,17 +43,39 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
-public class MessagingServerPacketHandler extends ServerPacketHandlerSupport
+public class MessagingServerPacketHandler extends ServerPacketHandlerSupport implements MessagingComponent
{
private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
private final MessagingServer server;
- public MessagingServerPacketHandler(final MessagingServer server)
+ private final ClientPinger clientPinger;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ public MessagingServerPacketHandler(final MessagingServer server, ClientPinger clientPinger)
{
this.server = server;
+ this.clientPinger = clientPinger;
+
}
-
+
+ public void start() throws Exception
+ {
+ if (server.getConfiguration().getKeepAliveInterval() > 0)
+ {
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+ scheduledExecutor.scheduleAtFixedRate(clientPinger, 0, server.getConfiguration().getKeepAliveInterval(), TimeUnit.SECONDS);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ if (server.getConfiguration().getKeepAliveInterval() > 0)
+ {
+ scheduledExecutor.shutdownNow();
+ }
+ }
/*
* The advantage to use String as ID is that we can leverage Java 5 UUID to
* generate these IDs. However theses IDs are 128 bite long and it increases
@@ -73,11 +100,19 @@
{
CreateConnectionRequest request = (CreateConnectionRequest) packet;
- response = server.createConnection(request.getUsername(), request.getPassword(),
+ CreateConnectionResponse createConnectionResponse = server.createConnection(request.getUsername(), request.getPassword(),
request.getRemotingSessionID(),
sender.getRemoteAddress(),
request.getVersion());
- }
+ clientPinger.registerConnection(request.getRemotingSessionID(), sender);
+ response = createConnectionResponse;
+
+ }
+ else if(type == EmptyPacket.PONG)
+ {
+ Pong decodedPong = (Pong) packet;
+ clientPinger.pong(decodedPong);
+ }
else
{
throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -283,6 +283,11 @@
return createdTime;
}
+ public long getRemotingClientSessionID()
+ {
+ return remotingClientSessionID;
+ }
+
public Collection<ServerSession> getSessions()
{
return sessions;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.ClientPinger;
/**
*
@@ -39,10 +40,12 @@
public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
{
private final ServerConnection connection;
+ final ClientPinger clientPinger;
- public ServerConnectionPacketHandler(final ServerConnection connection)
+ public ServerConnectionPacketHandler(final ServerConnection connection, final ClientPinger clientPinger)
{
this.connection = connection;
+ this.clientPinger = clientPinger;
}
public long getID()
@@ -69,6 +72,7 @@
connection.stop();
break;
case EmptyPacket.CLOSE:
+ clientPinger.unregister(connection.getRemotingClientSessionID());
connection.close();
break;
default:
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.ConnectionManager;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.microcontainer.JBMBootstrapServer;
@@ -967,7 +968,7 @@
protected void sleepIfRemoting(int time) throws Exception
{
Configuration config = servers.get(0).getMessagingServer().getRemotingService().getConfiguration();
- if (config.isInvmDisabled())
+ if (config.getTransport() == TransportType.TCP)
{
Thread.sleep(time);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -132,7 +132,7 @@
assertEquals(9, p.exitValue());
- Thread.sleep(2000);
+ Thread.sleep(4000);
// the crash must have been detected and the client resources cleaned
// up only the local connection remains
assertActiveConnections(1);
@@ -161,6 +161,8 @@
super.setUp();
ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration("localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+ config.setKeepAliveInterval(2);
+ config.setKeepAliveTimeout(1);
server = new MessagingServerImpl(config);
server.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -7,6 +7,7 @@
package org.jboss.messaging.tests.integration.core.remoting.mina;
import static java.util.concurrent.TimeUnit.SECONDS;
+
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
@@ -21,25 +22,27 @@
import junit.framework.TestCase;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.client.impl.*;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import org.jboss.messaging.core.remoting.impl.mina.ClientKeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.ClientKeepAliveHandler;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.server.impl.MessagingServerPacketHandler;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
* @version <tt>$Revision$</tt>
- *
*/
public class ClientKeepAliveTest extends TestCase
{
@@ -47,7 +50,7 @@
// Attributes ----------------------------------------------------
- private MinaService service;
+ private MessagingServer messagingServer;
// Static --------------------------------------------------------
@@ -61,141 +64,156 @@
ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration("localhost", TestSupport.PORT);
config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
- service = new MinaService(config);
- service.start();
+ messagingServer = new MessagingServerImpl(config);
+ messagingServer.start();
}
@Override
protected void tearDown() throws Exception
{
- service.stop();
- service = null;
+ messagingServer.stop();
}
public void testKeepAliveWithClientOK() throws Exception
- {
- KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+ {
+ KeepAliveHandler factory = createMock(KeepAliveHandler.class);
// client never send ping
- expect(factory.ping(anyLong())).andStubReturn(null);
- expect(factory.isPing(anyLong(), isA(Ping.class))).andStubReturn(true);
- expect(factory.isPing(anyLong(), isA(Object.class))).andStubReturn(false);
- // client is responding
- expect(factory.pong(anyLong(), isA(Ping.class))).andReturn(new Pong(randomLong(), false)).atLeastOnce();
+ //expect(factory.isAlive(isA(Ping.class), isA(Pong.class))).andStubReturn(true);
+ //expect(factory.isAlive(isA(Ping.class), isA(Pong.class))).andStubReturn(false);
replay(factory);
final CountDownLatch latch = new CountDownLatch(1);
- RemotingSessionListener listener = new RemotingSessionListener() {
+ RemotingSessionListener listener = new RemotingSessionListener()
+ {
public void sessionDestroyed(long sessionID, MessagingException me)
{
latch.countDown();
}
};
- service.addRemotingSessionListener(listener);
-
- MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), factory);
+ messagingServer.getRemotingService().addRemotingSessionListener(listener);
+ ConnectionParams connectionParams = new ConnectionParamsImpl();
+ connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+ connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+ MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), connectionParams, new PacketDispatcherImpl(null), factory);
connector.connect();
boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
- + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertFalse(firedKeepAliveNotification);
- service.removeRemotingSessionListener(listener);
- connector.disconnect();
+ messagingServer.getRemotingService().removeRemotingSessionListener(listener);
+ //connector.disconnect();
verify(factory);
}
-
- public void testKeepAliveWithClientNotResponding() throws Exception
+
+ public void testKeepAliveWithClientNotResponding() throws Throwable
{
- KeepAliveFactory factory = new ClientKeepAliveFactoryNotResponding();
+ final KeepAliveHandler factory = new ClientKeepAliveFactoryNotResponding();
final long[] clientSessionIDNotResponding = new long[1];
final CountDownLatch latch = new CountDownLatch(1);
- RemotingSessionListener listener = new RemotingSessionListener() {
+ RemotingSessionListener listener = new RemotingSessionListener()
+ {
public void sessionDestroyed(long sessionID, MessagingException me)
{
clientSessionIDNotResponding[0] = sessionID;
latch.countDown();
}
};
- service.addRemotingSessionListener(listener);
-
- MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), factory);
+ messagingServer.getRemotingService().addRemotingSessionListener(listener);
+ ConnectionParams connectionParams = new ConnectionParamsImpl();
+ connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+ connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+ LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+ MinaConnector connector = new MinaConnector(location, connectionParams, new PacketDispatcherImpl(null), factory);
+
NIOSession session = connector.connect();
+ RemotingConnection remotingConnection = new RemotingConnectionImpl(location, connectionParams, connector);
+ createConnection(messagingServer, remotingConnection);
long clientSessionID = session.getID();
boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
- + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertTrue("notification has not been received", firedKeepAliveNotification);
assertNotNull(clientSessionIDNotResponding[0]);
assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
- service.removeRemotingSessionListener(listener);
+ messagingServer.getRemotingService().removeRemotingSessionListener(listener);
connector.disconnect();
}
- public void testKeepAliveWithClientTooLongToRespond() throws Exception
+ public void testKeepAliveWithClientTooLongToRespond() throws Throwable
{
- KeepAliveFactory factory = new KeepAliveFactory()
+ KeepAliveHandler factory = new KeepAliveHandler()
{
- public Ping ping(long sessionID)
+ public boolean isAlive(Ping ping, Pong pong)
{
- return null;
+ return false; //todo
}
-
- public boolean isPing(long sessionID, Object message)
+
+ public void handleDeath(long sessionId)
{
- return (message instanceof Ping);
+ //todo
}
- public synchronized Pong pong(long sessionID, Ping ping)
+ public Pong ping(Ping pong)
{
- // like a TCP timeout, there is no response in the next 2 hours
try
{
wait(2 * 3600);
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException e)
{
e.printStackTrace();
}
return new Pong(randomLong(), false);
- }
+ }
};
try
{
- MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT),
- new PacketDispatcherImpl(null), factory);
+ ConnectionParams connectionParams = new ConnectionParamsImpl();
+ connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+ connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+ LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+ MinaConnector connector = new MinaConnector(location, connectionParams,
+ new PacketDispatcherImpl(null), factory);
NIOSession session = connector.connect();
+ //create a connection properly to initiate ping
+ RemotingConnection remotingConnection = new RemotingConnectionImpl(location, connectionParams, connector);
+ createConnection(messagingServer, remotingConnection);
long clientSessionID = session.getID();
final AtomicLong clientSessionIDNotResponding = new AtomicLong(-1);
final CountDownLatch latch = new CountDownLatch(1);
- RemotingSessionListener listener = new RemotingSessionListener() {
+ RemotingSessionListener listener = new RemotingSessionListener()
+ {
public void sessionDestroyed(long sessionID, MessagingException me)
{
clientSessionIDNotResponding.set(sessionID);
latch.countDown();
}
};
- service.addRemotingSessionListener(listener);
+ messagingServer.getRemotingService().addRemotingSessionListener(listener);
boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
- + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertTrue("notification has not been received", firedKeepAliveNotification);
assertEquals(clientSessionID, clientSessionIDNotResponding.longValue());
- service.removeRemotingSessionListener(listener);
+ messagingServer.getRemotingService().removeRemotingSessionListener(listener);
connector.disconnect();
- } finally
+ }
+ finally
{
// test is done: wake up the factory
synchronized (factory)
@@ -206,65 +224,77 @@
}
public void testKeepAliveWithClientRespondingAndClientNotResponding()
- throws Exception
+ throws Throwable
{
- KeepAliveFactory notRespondingfactory = new ClientKeepAliveFactoryNotResponding();
- KeepAliveFactory respondingfactory = new ClientKeepAliveFactory();
+ KeepAliveHandler notRespondingfactory = new ClientKeepAliveFactoryNotResponding();
+ KeepAliveHandler respondingfactory = new ClientKeepAliveHandler();
final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
final CountDownLatch latch = new CountDownLatch(1);
- RemotingSessionListener listener = new RemotingSessionListener() {
+ RemotingSessionListener listener = new RemotingSessionListener()
+ {
public void sessionDestroyed(long sessionID, MessagingException me)
{
sessionIDNotResponding.set(sessionID);
latch.countDown();
}
};
- service.addRemotingSessionListener(listener);
-
- MinaConnector connectorNotResponding = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), notRespondingfactory);
- MinaConnector connectorResponding = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), respondingfactory);
+ messagingServer.getRemotingService().addRemotingSessionListener(listener);
+ ConnectionParams connectionParams = new ConnectionParamsImpl();
+ connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+ connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+ LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+ MinaConnector connectorNotResponding = new MinaConnector(location, new PacketDispatcherImpl(null), notRespondingfactory);
+ MinaConnector connectorResponding = new MinaConnector(location, new PacketDispatcherImpl(null), respondingfactory);
NIOSession sessionNotResponding = connectorNotResponding.connect();
+ //create a connection properly to initiate ping
+ RemotingConnection remotingConnection = new RemotingConnectionImpl(location, connectionParams, connectorNotResponding);
+ createConnection(messagingServer, remotingConnection);
long clientSessionIDNotResponding = sessionNotResponding.getID();
-
+
NIOSession sessionResponding = connectorResponding.connect();
+ RemotingConnection remotingConnection2 = new RemotingConnectionImpl(location, connectionParams, connectorNotResponding);
+ createConnection(messagingServer, remotingConnection2);
long clientSessionIDResponding = sessionResponding.getID();
boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
- + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+ + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertTrue("notification has not been received", firedKeepAliveNotification);
assertEquals(clientSessionIDNotResponding, sessionIDNotResponding.longValue());
assertNotSame(clientSessionIDResponding, sessionIDNotResponding.longValue());
- service.removeRemotingSessionListener(listener);
+ messagingServer.getRemotingService().removeRemotingSessionListener(listener);
connectorNotResponding.disconnect();
connectorResponding.disconnect();
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ private void createConnection(MessagingServer server, RemotingConnection remotingConnection) throws Throwable
+ {
+ long sessionID = remotingConnection.getSessionID();
+
+ CreateConnectionRequest request =
+ new CreateConnectionRequest(server.getVersion().getIncrementingVersion(), sessionID, null, null);
+
+ CreateConnectionResponse response =
+ (CreateConnectionResponse) remotingConnection.sendBlocking(0, 0, request);
+ }
// Inner classes -------------------------------------------------
-
- private class ClientKeepAliveFactoryNotResponding extends ClientKeepAliveFactory
+
+ private class ClientKeepAliveFactoryNotResponding extends ClientKeepAliveHandler
{
- @Override
- public Ping ping(long clientSessionID)
+ public Pong ping(Ping ping)
{
return null;
}
-
- @Override
- public Pong pong(long sessionID, Ping ping)
- {
- return null;
- }
}
}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -28,9 +28,6 @@
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
/**
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,100 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.tests.integration.core.remoting.mina;
-
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import junit.framework.TestCase;
-
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.MinaKeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class MinaKeepAliveFactoryTest extends TestCase
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testResponseIsNotAPong()
- {
- IoSession session = createMock(IoSession.class);
- KeepAliveFactory factory = createMock(KeepAliveFactory.class);
- CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
- replay(session, factory, notifier);
-
- MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
- assertFalse(minaFactory.isResponse(session, new Object()));
-
- verify(session, factory, notifier);
- }
-
- public void testResponseIsAPongWithSessionNotFailed()
- {
- IoSession session = createMock(IoSession.class);
- long sessionID = randomLong();
- Pong pong = new Pong(sessionID, false);
- KeepAliveFactory factory = createMock(KeepAliveFactory.class);
- CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
- replay(session, factory, notifier);
-
- MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
- assertTrue(minaFactory.isResponse(session, pong));
-
- verify(session, factory, notifier);
- }
-
- public void testResponseIsAPongWithSessionFailed()
- {
- IoSession session = createMock(IoSession.class);
- long sessionID = randomLong();
- expect(session.getId()).andStubReturn(sessionID);
- Pong pong = new Pong(sessionID, true);
- KeepAliveFactory factory = createMock(KeepAliveFactory.class);
- CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
- notifier.fireCleanup(anyLong(), isA(MessagingException.class));
- expectLastCall().once();
- replay(session, factory, notifier);
-
- MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
- assertTrue(minaFactory.isResponse(session, pong));
-
- verify(session, factory, notifier);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -14,15 +14,20 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.RemotingConnection;
+import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
-import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
/**
@@ -37,7 +42,7 @@
// Attributes ----------------------------------------------------
- private MinaService service;
+ private MessagingServer messagingServer;
// Static --------------------------------------------------------
@@ -53,93 +58,25 @@
@Override
protected void tearDown() throws Exception
{
- service.stop();
- service = null;
+ messagingServer.stop();
+ messagingServer = null;
}
- public void testKeepAliveWithServerNotResponding() throws Exception
+ public void testKeepAliveWithServerNotResponding() throws Throwable
{
- ServerKeepAliveFactory factory = new ServerKeepAliveFactory()
- {
- // server does not send ping
- @Override
- public Ping ping(long sessionID)
- {
- return null;
- }
-
- @Override
- public Pong pong(long sessionID, Ping ping)
- {
- // no pong -> server is not responding
- super.pong(sessionID, ping);
- return null;
- }
- };
-
+ //set the server timeouts to be twice that of the server to force failure
ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration(
"localhost", TestSupport.PORT);
- config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
- config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
- service = new MinaService(config, factory);
- service.start();
-
- MinaConnector connector = new MinaConnector(service
- .getConfiguration().getLocation(), service.getConfiguration().getConnectionParams(), new PacketDispatcherImpl(null));
-
- final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
- final CountDownLatch latch = new CountDownLatch(1);
-
- RemotingSessionListener listener = new RemotingSessionListener()
- {
- public void sessionDestroyed(long sessionID, MessagingException me)
- {
- sessionIDNotResponding.set(sessionID);
- latch.countDown();
- }
- };
- connector.addSessionListener(listener);
-
- NIOSession session = connector.connect();
-
- boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
- + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
- assertTrue(firedKeepAliveNotification);
- assertEquals(session.getID(), sessionIDNotResponding.longValue());
-
- connector.removeSessionListener(listener);
- connector.disconnect();
- }
-
- public void testKeepAliveWithServerSessionFailed() throws Exception
- {
- ServerKeepAliveFactory factory = new ServerKeepAliveFactory()
- {
- // server does not send ping
- @Override
- public Ping ping(long sessionID)
- {
- return null;
- }
-
- @Override
- public Pong pong(long sessionID, Ping ping)
- {
- // no pong -> server is not responding
- super.pong(sessionID, ping);
- return new Pong(sessionID, true);
- }
- };
-
- ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration(
+ config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL * 2);
+ config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT * 2);
+ ConfigurationImpl clientConfig = ConfigurationHelper.newTCPConfiguration(
"localhost", TestSupport.PORT);
- config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
- config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
- service = new MinaService(config, factory);
- service.start();
+ clientConfig.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+ clientConfig.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+ messagingServer = new MessagingServerImpl(config);
+ messagingServer.start();
- MinaConnector connector = new MinaConnector(service
- .getConfiguration().getLocation(), new PacketDispatcherImpl(null));
+ MinaConnector connector = new MinaConnector(clientConfig.getLocation(), clientConfig.getConnectionParams(), new PacketDispatcherImpl(null));
final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
final CountDownLatch latch = new CountDownLatch(1);
@@ -155,7 +92,8 @@
connector.addSessionListener(listener);
NIOSession session = connector.connect();
-
+ RemotingConnection remotingConnection = new RemotingConnectionImpl(config.getLocation(), config.getConnectionParams(), connector);
+ createConnection(messagingServer, remotingConnection);
boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
+ TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertTrue(firedKeepAliveNotification);
@@ -165,11 +103,21 @@
connector.disconnect();
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ private void createConnection(MessagingServer server, RemotingConnection remotingConnection) throws Throwable
+ {
+ long sessionID = remotingConnection.getSessionID();
+ CreateConnectionRequest request =
+ new CreateConnectionRequest(server.getVersion().getIncrementingVersion(), sessionID, null, null);
+
+ CreateConnectionResponse response =
+ (CreateConnectionResponse) remotingConnection.sendBlocking(0, 0, request);
+ }
// Inner classes -------------------------------------------------
}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java 2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java 2008-05-22 12:29:03 UTC (rev 4282)
@@ -130,6 +130,8 @@
boolean gotExceptionsOnTheServerAndTheClient = exceptionLatch.await(
KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
assertTrue(gotExceptionsOnTheServerAndTheClient);
+ //now we need to wait for the server to detect the client failover
+ Thread.sleep((KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT) * 1000);
assertActiveConnectionsOnTheServer(0);
try
@@ -163,6 +165,8 @@
boolean gotExceptionOnTheServer = exceptionLatch.await(
KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT + 5, SECONDS);
assertTrue(gotExceptionOnTheServer);
+ //now we need to wait for the server to detect the client failover
+ Thread.sleep((KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT) * 1000);
assertActiveConnectionsOnTheServer(0);
try
More information about the jboss-cvs-commits
mailing list