[jboss-cvs] JBoss Messaging SVN: r4282 - in trunk: src/main/org/jboss/messaging/core/client and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 22 08:29:03 EDT 2008


Author: ataylor
Date: 2008-05-22 08:29:03 -0400 (Thu, 22 May 2008)
New Revision: 4282

Added:
   trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
   trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java
Modified:
   trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java
Log:
added keepalive functionality

Modified: trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -40,6 +40,7 @@
    public static void main(String[] args)
    {
       Connection connection = null;
+      Connection connection2 = null;
       try
       {
          //create an initial context, env will be picked up from jndi.properties
@@ -47,6 +48,7 @@
          Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
          ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
          connection = cf.createConnection();
+         connection2 = cf.createConnection();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer producer = session.createProducer(queue);
          Message message = session.createTextMessage("This is a text message!");
@@ -57,6 +59,14 @@
          TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
          log.info("message received from queue");
          log.info("message = " + message2.getText());
+         try
+         {
+            Thread.sleep(200000);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
       }
       catch (NamingException e)
       {
@@ -72,6 +82,7 @@
             try
             {
                connection.close();
+               connection2.close();
             }
             catch (JMSException e)
             {

Added: trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,8 @@
+package org.jboss.messaging.core.client;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ServerPonger extends Runnable
+{
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -25,6 +25,9 @@
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
@@ -83,6 +86,18 @@
       log.trace(this + " created with configuration " + location);
    }
 
+   public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, NIOConnector nioConnector) throws Exception
+   {
+      assert location != null;
+      assert connectionParams != null;
+
+      this.location = location;
+      this.connectionParams = connectionParams;
+      connector = nioConnector;
+      session = connector.connect();
+      log.trace(this + " created with connector " + nioConnector);
+   }
+
    // Public ---------------------------------------------------------------------------------------
 
    // RemotingConnection implementation ------------------------------------------------------------
@@ -99,7 +114,8 @@
 
       log.trace(this + " started");
    }
-   
+
+
    public void stop()
    {
       log.trace(this + " stop");
@@ -124,7 +140,7 @@
       
       log.trace(this + " closed");
    }
-   
+
    public long getSessionID()
    {
       if (session == null || !session.isConnected())

Added: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,99 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.client.ServerPonger;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.apache.mina.common.IoSession;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerPongerImpl implements PacketHandler, ServerPonger
+{
+   private static Logger log = Logger.getLogger(ServerPongerImpl.class);
+   private static boolean traceEnabled = log.isTraceEnabled();
+   IoSession session;
+   private long id;
+   long interval;
+   long timeout;
+   CleanUpNotifier cleanUpNotifier;
+   KeepAliveHandler keepAliveHandler;
+   CountDownLatch latch = new CountDownLatch(1);
+
+   public ServerPongerImpl(CleanUpNotifier cleanUpNotifier, KeepAliveHandler keepAliveHandler, IoSession session, long id, long timeout, long interval)
+   {
+      this.cleanUpNotifier = cleanUpNotifier;
+      this.keepAliveHandler = keepAliveHandler;
+      this.session = session;
+      this.id = id;
+      this.timeout = timeout;
+      this.interval = interval;
+   }
+
+
+   public void run()
+   {
+      boolean pinged;
+      latch = new CountDownLatch(1);
+      try
+      {
+         pinged = latch.await(timeout + interval, TimeUnit.MILLISECONDS);
+         if(!pinged)
+         {
+            log.warn("no ping received from server, cleaning up connection.");
+            cleanUpNotifier.fireCleanup(session.getId(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "no ping received from server"));
+         }
+      }
+      catch (InterruptedException e)
+      {
+      }
+   }
+
+   public long getID()
+   {
+      return id;
+   }
+
+   public void handle(Packet packet, PacketReturner sender)
+   {
+      Ping ping = (Ping) packet;
+      latch.countDown();
+      if(traceEnabled)
+      {
+         log.trace("received ping:" + ping);
+      }
+      Pong pong = keepAliveHandler.ping(ping);
+      if(pong != null)
+      {
+         session.write(pong);
+      }
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,26 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public interface KeepAliveFactory
-{
-
-   Ping ping(long sessionID);
-   
-   Pong pong(long sessionID, Ping ping);
-
-   boolean isPing(long sessionID, Object message);
-}

Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveHandler.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,14 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * pluggable component that defines how a client responds to a server ping command
+ * 
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface KeepAliveHandler
+{
+   Pong ping(Ping pong);
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -9,6 +9,8 @@
 import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.exception.MessagingException;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -29,4 +31,6 @@
    void addRemotingSessionListener(RemotingSessionListener listener);
 
    void removeRemotingSessionListener(RemotingSessionListener listener);
+
+   void setClientPinger(ClientPinger clientPinger);
 }

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,43 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.KeepAliveHandler;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * pluggable component that defines how a client responds to a server ping command. This simple implementation returns a
+ * valid pong
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ClientKeepAliveHandler implements KeepAliveHandler
+{
+
+   public Pong ping(Ping ping)
+   {
+      Pong pong = new Pong(ping.getSessionID(), false);
+      pong.setTargetID(0);
+      return pong;
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class ClientKeepAliveFactory implements KeepAliveFactory
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // KeepAliveFactory implementation -------------------------------
-   
-   public Ping ping(long clientSessionID)
-   {
-      return new Ping(clientSessionID);
-   }
-
-   public boolean isPing(long sessionID, Object message)
-   {
-      return (message instanceof Ping);
-   }
-
-   public Pong pong(long sessionID, Ping ping)
-   {
-      return new Pong(sessionID, false);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -16,7 +16,6 @@
 import org.apache.mina.filter.keepalive.KeepAliveFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
 
 /**
@@ -46,29 +45,7 @@
 
       filterChain.addLast("codec", new ProtocolCodecFilter(new MessagingCodec()));
    }
-   
-   public static void addKeepAliveFilter(final DefaultIoFilterChainBuilder filterChain,
-         final KeepAliveFactory factory, final int keepAliveInterval,
-         final int keepAliveTimeout, final CleanUpNotifier notifier)
-   {
-      assert filterChain != null;
-      assert factory != null;
-      assert notifier != null; 
-     
-      if (keepAliveTimeout > keepAliveInterval)
-      {
-         throw new IllegalArgumentException("timeout must be greater than the interval: "
-               + "keepAliveTimeout= " + keepAliveTimeout
-               + ", keepAliveInterval=" + keepAliveInterval);
-      }
 
-      KeepAliveFilter filter = new KeepAliveFilter(
-            new MinaKeepAliveFactory(factory, notifier), BOTH_IDLE, EXCEPTION, keepAliveInterval,
-            keepAliveTimeout);
-      filter.setForwardEvent(true);
-      filterChain.addLast("keep-alive", filter);
-   }
-
    public static void addSSLFilter(
          final DefaultIoFilterChainBuilder filterChain, final boolean client,
          final String keystorePath, final String keystorePassword, final String trustStorePath,

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -6,14 +6,14 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.ConnectFuture;
@@ -27,13 +27,13 @@
 import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.ServerPonger;
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.impl.ServerPongerImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.NIOConnector;
-import org.jboss.messaging.core.remoting.NIOSession;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.impl.ClientKeepAliveHandler;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -68,6 +68,11 @@
 
    private MinaHandler handler;
 
+   KeepAliveHandler keepAliveHandler;
+
+   private ScheduledExecutorService scheduledExecutor;
+
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -75,22 +80,22 @@
    // Public --------------------------------------------------------
    public MinaConnector(Location location, PacketDispatcher dispatcher)
    {
-      this(location, new ConnectionParamsImpl(),  dispatcher, new ClientKeepAliveFactory());
+      this(location, new ConnectionParamsImpl(),  dispatcher, new ClientKeepAliveHandler());
    }
 
    public MinaConnector(Location location, ConnectionParams connectionParams, PacketDispatcher dispatcher)
    {
-      this(location, connectionParams,  dispatcher, new ClientKeepAliveFactory());
+      this(location, connectionParams,  dispatcher, new ClientKeepAliveHandler());
    }
 
    public MinaConnector(Location location, PacketDispatcher dispatcher,
-         KeepAliveFactory keepAliveFactory)
+         KeepAliveHandler keepAliveFactory)
    {
       this(location, new ConnectionParamsImpl(), dispatcher, keepAliveFactory);
    }
 
    public MinaConnector(Location location, ConnectionParams connectionParams, PacketDispatcher dispatcher,
-         KeepAliveFactory keepAliveFactory)
+         KeepAliveHandler keepAliveFactory)
    {
       assert location != null;
       assert dispatcher != null;
@@ -100,8 +105,8 @@
       this.location = location;
       this.connectionParams = connectionParams;
       this.dispatcher = dispatcher;
-
-      connector = new NioSocketConnector();
+      this.keepAliveHandler = keepAliveFactory;
+      this.connector = new NioSocketConnector();
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
 
       connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
@@ -120,7 +125,7 @@
          }
       }
       addCodecFilter(filterChain);
-//      addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
+//     addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
 //            connectionParams.getKeepAliveTimeout(), this);
       connector.getSessionConfig().setTcpNoDelay(connectionParams.isTcpNoDelay());
       int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
@@ -166,9 +171,15 @@
          throw new IOException("Cannot connect to " + address.toString());
       }
       session = future.getSession();
-//      Packet packet = new Ping(session.getId());
-//      session.write(packet);
 
+      ServerPongerImpl pinger = new ServerPongerImpl(this, keepAliveHandler, session, 0, connectionParams.getKeepAliveTimeout() * 1000, connectionParams.getKeepAliveInterval() * 1000);
+
+      getDispatcher().register(pinger);
+      if (connectionParams.getKeepAliveInterval() > 0)
+      {
+         scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+         scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
+      }
       return new MinaSession(session, handler);
    }
 
@@ -204,7 +215,10 @@
 
       connector = null;
       session = null;
-
+      if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
+      {
+         scheduledExecutor.shutdown();
+      }
       return closed;
    }
 
@@ -248,6 +262,10 @@
 
    public synchronized void fireCleanup(long sessionID, MessagingException me)
    {
+      if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
+      {
+         scheduledExecutor.shutdown();
+      }
       for (RemotingSessionListener listener: listeners)
       {
          listener.sessionDestroyed(sessionID, me);

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,93 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class MinaKeepAliveFactory implements KeepAliveMessageFactory
-{
-   // Constant ------------------------------------------------------
-
-   private static final Logger log = Logger
-         .getLogger(MinaKeepAliveFactory.class);
-
-   // Attributes ----------------------------------------------------
-
-   private KeepAliveFactory innerFactory;
-
-   private CleanUpNotifier notifier;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public MinaKeepAliveFactory(KeepAliveFactory factory,
-         CleanUpNotifier notifier)
-   {
-      assert factory != null;
-
-      this.notifier = notifier;
-      this.innerFactory = factory;
-   }
-
-   // Public --------------------------------------------------------
-
-   // KeepAliveMessageFactory implementation ------------------------
-
-   public Object getRequest(IoSession session)
-   {
-      return innerFactory.ping(session.getId());
-   }
-
-   public Object getResponse(IoSession session, Object request)
-   {
-      return innerFactory.pong(session.getId(), (Ping) request);
-   }
-
-   public boolean isRequest(IoSession session, Object request)
-   {
-      return innerFactory.isPing(session.getId(), request);
-   }
-
-   public boolean isResponse(IoSession session, Object response)
-   {
-      if (response instanceof Pong)
-      {
-         Pong pong = (Pong) response;
-         if (pong.isSessionFailed() && notifier != null)
-         {
-            notifier.fireCleanup(session.getId(), new MessagingException(
-                  MessagingException.CONNECTION_TIMEDOUT,
-                  "Session has failed on the server"));
-         }
-         return true;
-      } else
-      {
-         return false;
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -34,6 +34,8 @@
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.server.impl.ClientPingerImpl;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -61,11 +63,11 @@
 
    private ExecutorService threadPool;
 
-   private final List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+   private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
 
-   private ServerKeepAliveFactory factory;
+   private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
 
-   private final List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+   private ClientPinger clientPinger;
 
    // Static --------------------------------------------------------
 
@@ -73,18 +75,11 @@
 
    public MinaService(Configuration config)
    {
-      this(config, new ServerKeepAliveFactory());
-   }
-
-   public MinaService(Configuration config, ServerKeepAliveFactory factory)
-   {
       assert config != null;
-      assert factory != null;
 
       validate(config);
 
       this.config = config;
-      this.factory = factory;
       dispatcher = new PacketDispatcherImpl(filters);
    }
 
@@ -114,6 +109,12 @@
       listeners.remove(listener);
    }
 
+   public void setClientPinger(ClientPinger clientPinger)
+   {
+      this.clientPinger = clientPinger;
+      clientPinger.registerCleanUpNotifier(this);
+   }
+
    // TransportService implementation -------------------------------
 
    public void start() throws Exception
@@ -228,26 +229,15 @@
 
    public void fireCleanup(long sessionID, MessagingException me)
    {
-      if (factory.getSessions().containsKey(sessionID))
+      for (RemotingSessionListener listener : listeners)
       {
-         long clientSessionID = factory.getSessions().containsKey(sessionID)?factory.getSessions().get(sessionID):0;
-         for (RemotingSessionListener listener : listeners)
-         {
-            listener.sessionDestroyed(clientSessionID, me);
-         }
-         factory.getSessions().remove(sessionID);
+         listener.sessionDestroyed(sessionID, me);
       }
    }
 
    // Public --------------------------------------------------------
 
-   public void setKeepAliveFactory(ServerKeepAliveFactory factory)
-   {
-      assert factory != null;
 
-      this.factory = factory;
-   }
-
    public void setRemotingConfiguration(Configuration remotingConfig)
    {
       assert started == false;

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,95 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class ServerKeepAliveFactory implements KeepAliveFactory
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger
-         .getLogger(ServerKeepAliveFactory.class);
-
-   // Attributes ----------------------------------------------------
-
-   // FIXME session mapping must be cleaned when the server session is closed:
-   // either normally or exceptionally
-   /**
-    * Key = server session ID Value = client session ID
-    */
-   private Map<Long, Long> sessions = new ConcurrentHashMap<Long, Long>();
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // KeepAliveFactory implementation -------------------------------
-
-   public Ping ping(long sessionID)
-   {
-      return new Ping(sessionID);
-   }
-
-   public boolean isPing(long sessionID, Object message)
-   {
-      if (!(message instanceof Ping))
-      {
-         return false;
-      }
-      else
-      {
-         Ping ping = (Ping) message;
-         long clientSessionID = ping.getSessionID();
-         if (clientSessionID == sessionID)
-         {
-            return false;
-         }
-         else
-         {
-            if (log.isDebugEnabled())
-               log.debug("associated server session " + sessionID
-                     + " to client " + clientSessionID);
-            sessions.put(sessionID, clientSessionID);
-            return true;
-         }
-      }
-   }
-
-   public Pong pong(long sessionID, Ping ping)
-   {
-      long clientSessionID = ping.getSessionID();
-      return new Pong(sessionID, sessions.containsKey(clientSessionID));
-   }
-
-   public Map<Long, Long> getSessions()
-   {
-      return sessions;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,45 @@
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.PacketReturner;
+
+/**
+ * Used by a MessagingServer to detect that a client is still alive.
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ClientPinger extends Runnable
+{
+   /**
+    * this will be scheduled to run at the keep alive interval period
+    */
+   void run();
+
+   /**
+    * pong received from client
+    * @param pong the pong
+    */
+   void pong(Pong pong);
+
+   /**
+    * register a connection.
+    *
+    * @param remotingSessionID  the session id
+    * @param sender the sender
+    */
+   void registerConnection(long remotingSessionID, PacketReturner sender);
+
+   /**
+    * unregister a connection.
+    *
+    * @param remotingSessionID the session id
+    */
+   void unregister(long remotingSessionID);
+
+   /**
+    * register the cleanup notifier to use
+    *
+    * @param cleanUpNotifier the notifier
+    */
+   void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier);
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -73,5 +73,7 @@
 
    long getCreated();
 
+   long getRemotingClientSessionID();
+
    Collection<ServerSession> getSessions();
 }

Added: trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -0,0 +1,236 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.exception.MessagingException;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ClientPingerImpl implements ClientPinger
+{
+   private static Logger log = Logger.getLogger(ClientPingerImpl.class);
+
+   private static boolean isTraceEnabled = log.isTraceEnabled();
+   /**
+    * the current active connections
+    */
+   private Map<Long, ConnectionHolder> connections = new ConcurrentHashMap<Long, ConnectionHolder>();
+   /**
+    * holds connections we are waiting for for replies
+    */
+   List<Long> replies = new ArrayList<Long>();
+   /**
+    * the server
+    */
+   private MessagingServer server;
+   /**
+    * the cleanupnotifier to use on failed pings
+    */
+   private CleanUpNotifier cleanUpNotifier;
+
+   public ClientPingerImpl(MessagingServer server)
+   {
+      this.server = server;
+   }
+
+   public void run()
+   {
+      try
+      {
+         synchronized (this)
+         {
+            replies.clear();
+            //ping all the sessions
+            for (Long sessionId : connections.keySet())
+            {
+               try
+               {
+                  Ping ping = new Ping(sessionId);
+                  ping.setTargetID(0);
+                  connections.get(sessionId).getPacketReturner().send(ping);
+                  replies.add(sessionId);
+                  if(isTraceEnabled)
+                  {
+                     log.trace("sending " + ping);
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+            //wait for the keep alive timeout period
+            try
+            {
+               wait(server.getConfiguration().getKeepAliveTimeout() * 1000);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+         //at this point cleanup any replies we havent received
+         for (Long reply : replies)
+         {
+            if(cleanUpNotifier != null)
+               cleanUpNotifier.fireCleanup(reply, new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
+            connections.remove(reply);
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   /**
+    * pong received from client
+    * @param pong
+    */
+   public void pong(Pong pong)
+   {
+      if(isTraceEnabled)
+      {
+         log.trace("received reply" + pong);
+      }
+      replies.remove(pong.getSessionID());
+   }
+
+   /**
+    * register a connection.
+    *
+    * @param remotingSessionID
+    * @param sender
+    */
+   public void registerConnection(long remotingSessionID, PacketReturner sender)
+   {
+      if (connections.get(remotingSessionID) == null)
+      {
+         connections.put(remotingSessionID, new ConnectionHolder(remotingSessionID, sender));
+      }
+      else
+      {
+         connections.get(remotingSessionID).increment();
+      }
+
+   }
+
+   /**
+    * unregister a connection.
+    *
+    * @param remotingSessionID
+    */
+   public void unregister(long remotingSessionID)
+   {
+      ConnectionHolder connectionHolder = connections.get(remotingSessionID);
+      if(connectionHolder != null)
+      {
+         connectionHolder.decrement();
+         if(connectionHolder.get() == 0)
+         {
+            connections.remove(remotingSessionID);
+         }
+      }
+   }
+
+   /**
+    * register the cleanup notifier to use
+    *
+    * @param cleanUpNotifier
+    */
+   public void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
+   {
+      this.cleanUpNotifier = cleanUpNotifier;
+   }
+
+   /**
+    * simple holder class for sessions
+    */
+   class ConnectionHolder
+   {
+      AtomicInteger connectionCount = new AtomicInteger(1);
+      Long sessionId;
+      PacketReturner packetReturner;
+
+      public ConnectionHolder(Long sessionId, PacketReturner packetReturner)
+      {
+         this.sessionId = sessionId;
+         this.packetReturner = packetReturner;
+      }
+
+      public Integer increment()
+      {
+         return connectionCount.getAndIncrement();
+      }
+
+      public Integer decrement()
+      {
+         return connectionCount.getAndDecrement();
+      }
+
+      public Integer get()
+      {
+         return connectionCount.get();
+      }
+
+      public boolean equals(Object o)
+      {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         ConnectionHolder that = (ConnectionHolder) o;
+
+         if (!sessionId.equals(that.sessionId)) return false;
+
+         return true;
+      }
+
+      public int hashCode()
+      {
+         return sessionId.hashCode();
+      }
+
+      public long getSessionId()
+      {
+         return sessionId;
+      }
+
+      public PacketReturner getPacketReturner()
+      {
+         return packetReturner;
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -44,16 +44,14 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
-import org.jboss.messaging.core.server.ConnectionManager;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -98,6 +96,7 @@
    private Deployer queueSettingsDeployer;
    private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
    private DeploymentManager deploymentManager = new FileDeploymentManager();
+   private ClientPinger clientPinger;
 
    // plugins
 
@@ -112,6 +111,7 @@
    private QueueFactory queueFactory;
    private ResourceManager resourceManager = new ResourceManagerImpl(0);
    private ScheduledExecutorService scheduledExecutor;
+   private MessagingServerPacketHandler serverPacketHandler;
 
    // Constructors ---------------------------------------------------------------------------------
    /**
@@ -172,6 +172,8 @@
       }
       // Start the wired components
       securityDeployer.start();
+      clientPinger = new ClientPingerImpl(this);
+      remotingService.setClientPinger(clientPinger);
       remotingService.addRemotingSessionListener(connectionManager);
       memoryManager.start();
       deploymentManager.start(1);
@@ -179,10 +181,9 @@
       deploymentManager.registerDeployer(queueSettingsDeployer);
       postOffice.start();
       deploymentManager.start(2);
-
-      MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
+      serverPacketHandler = new MessagingServerPacketHandler(this, clientPinger);
       getRemotingService().getDispatcher().register(serverPacketHandler);
-
+      serverPacketHandler.start();
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       for (String interceptorClass : configuration.getDefaultInterceptors())
       {
@@ -216,6 +217,7 @@
       queueSettingsDeployer.stop();
       deploymentManager.stop();
       remotingService.removeRemotingSessionListener(connectionManager);
+      serverPacketHandler.stop();
       connectionManager = null;
       memoryManager.stop();
       memoryManager = null;
@@ -292,6 +294,7 @@
       this.postOffice = postOffice;
    }
 
+
    public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
    {
       return securityRepository;
@@ -345,7 +348,7 @@
                           queueSettingsRepository,
                           postOffice, securityStore, connectionManager);
 
-      remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
+      remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection, clientPinger));
 
       return new CreateConnectionResponse(connection.getID(), version);
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -27,10 +27,15 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.ClientPinger;
+import org.jboss.messaging.core.server.MessagingComponent;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A packet handler for all packets that need to be handled at the server level
  * 
@@ -38,17 +43,39 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
-public class MessagingServerPacketHandler extends ServerPacketHandlerSupport
+public class MessagingServerPacketHandler extends ServerPacketHandlerSupport implements MessagingComponent
 {
    private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
    
    private final MessagingServer server;
 
-   public MessagingServerPacketHandler(final MessagingServer server)
+   private final ClientPinger clientPinger;
+
+   private ScheduledExecutorService scheduledExecutor;
+
+   public MessagingServerPacketHandler(final MessagingServer server, ClientPinger clientPinger)
    {
       this.server = server;
+      this.clientPinger = clientPinger;
+
    }
-   
+
+   public void start() throws Exception
+   {
+      if (server.getConfiguration().getKeepAliveInterval() > 0)
+      {
+         scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+         scheduledExecutor.scheduleAtFixedRate(clientPinger, 0, server.getConfiguration().getKeepAliveInterval(), TimeUnit.SECONDS);
+      }
+   }
+
+   public void stop() throws Exception
+   {
+      if (server.getConfiguration().getKeepAliveInterval() > 0)
+      {
+         scheduledExecutor.shutdownNow();
+      }
+   }
    /*
    * The advantage to use String as ID is that we can leverage Java 5 UUID to
    * generate these IDs. However theses IDs are 128 bite long and it increases
@@ -73,11 +100,19 @@
       {
          CreateConnectionRequest request = (CreateConnectionRequest) packet;
          
-         response = server.createConnection(request.getUsername(), request.getPassword(),
+         CreateConnectionResponse  createConnectionResponse = server.createConnection(request.getUsername(), request.getPassword(),
          		                             request.getRemotingSessionID(),
                                             sender.getRemoteAddress(),
                                             request.getVersion());
-      }     
+         clientPinger.registerConnection(request.getRemotingSessionID(), sender);
+         response = createConnectionResponse;
+         
+      }
+      else if(type == EmptyPacket.PONG)
+      {
+         Pong decodedPong = (Pong) packet;
+         clientPinger.pong(decodedPong);
+      }
       else
       {
          throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -283,6 +283,11 @@
       return createdTime;
    }
 
+   public long getRemotingClientSessionID()
+   {
+      return remotingClientSessionID;
+   }
+
    public Collection<ServerSession> getSessions()
    {
       return sessions;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.ClientPinger;
 
 /**
  * 
@@ -39,10 +40,12 @@
 public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
 {
 	private final ServerConnection connection;
+   final ClientPinger clientPinger;
 	
-   public ServerConnectionPacketHandler(final ServerConnection connection)
+   public ServerConnectionPacketHandler(final ServerConnection connection, final ClientPinger clientPinger)
    {
    	this.connection = connection;
+      this.clientPinger = clientPinger;
    }
 
    public long getID()
@@ -69,6 +72,7 @@
          connection.stop();
          break;
       case EmptyPacket.CLOSE:
+         clientPinger.unregister(connection.getRemotingClientSessionID());
          connection.close();
          break;
       default:

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.server.ConnectionManager;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.remoting.TransportType;
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.messaging.jms.server.JMSServerManager;
 import org.jboss.messaging.microcontainer.JBMBootstrapServer;
@@ -967,7 +968,7 @@
    protected void sleepIfRemoting(int time) throws Exception
    {
       Configuration config = servers.get(0).getMessagingServer().getRemotingService().getConfiguration();
-      if (config.isInvmDisabled())
+      if (config.getTransport() == TransportType.TCP)
       {
          Thread.sleep(time);
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/impl/ClientCrashTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -132,7 +132,7 @@
 
          assertEquals(9, p.exitValue());
 
-         Thread.sleep(2000);
+         Thread.sleep(4000);
          // the crash must have been detected and the client resources cleaned
          // up only the local connection remains
          assertActiveConnections(1);
@@ -161,6 +161,8 @@
       super.setUp();
 
       ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration("localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+      config.setKeepAliveInterval(2);
+      config.setKeepAliveTimeout(1);
       server = new MessagingServerImpl(config);
       server.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ClientKeepAliveTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -7,6 +7,7 @@
 package org.jboss.messaging.tests.integration.core.remoting.mina;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
@@ -21,25 +22,27 @@
 
 import junit.framework.TestCase;
 
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.client.impl.*;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import org.jboss.messaging.core.remoting.impl.mina.ClientKeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.ClientKeepAliveHandler;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.server.impl.MessagingServerPacketHandler;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
  * @version <tt>$Revision$</tt>
- * 
  */
 public class ClientKeepAliveTest extends TestCase
 {
@@ -47,7 +50,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private MinaService service;
+   private MessagingServer messagingServer;
 
    // Static --------------------------------------------------------
 
@@ -61,141 +64,156 @@
       ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration("localhost", TestSupport.PORT);
       config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
       config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
-      service = new MinaService(config);
-      service.start();
+      messagingServer = new MessagingServerImpl(config);
+      messagingServer.start();
    }
 
    @Override
    protected void tearDown() throws Exception
    {
-      service.stop();
-      service = null;
+      messagingServer.stop();
    }
 
    public void testKeepAliveWithClientOK() throws Exception
-   {  
-      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+   {
+      KeepAliveHandler factory = createMock(KeepAliveHandler.class);
 
       // client never send ping
-      expect(factory.ping(anyLong())).andStubReturn(null);
-      expect(factory.isPing(anyLong(), isA(Ping.class))).andStubReturn(true);
-      expect(factory.isPing(anyLong(), isA(Object.class))).andStubReturn(false);
-      // client is responding
-      expect(factory.pong(anyLong(), isA(Ping.class))).andReturn(new Pong(randomLong(), false)).atLeastOnce();
+      //expect(factory.isAlive(isA(Ping.class), isA(Pong.class))).andStubReturn(true);
+      //expect(factory.isAlive(isA(Ping.class), isA(Pong.class))).andStubReturn(false);
 
       replay(factory);
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      RemotingSessionListener listener = new RemotingSessionListener() {
+      RemotingSessionListener listener = new RemotingSessionListener()
+      {
          public void sessionDestroyed(long sessionID, MessagingException me)
          {
             latch.countDown();
          }
       };
-      service.addRemotingSessionListener(listener);
-
-      MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), factory);
+      messagingServer.getRemotingService().addRemotingSessionListener(listener);
+      ConnectionParams connectionParams = new ConnectionParamsImpl();
+      connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+      connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+      MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), connectionParams, new PacketDispatcherImpl(null), factory);
       connector.connect();
 
       boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
-            + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+              + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
       assertFalse(firedKeepAliveNotification);
 
-      service.removeRemotingSessionListener(listener);
-      connector.disconnect();
+      messagingServer.getRemotingService().removeRemotingSessionListener(listener);
+      //connector.disconnect();
 
       verify(factory);
    }
-   
-   public void testKeepAliveWithClientNotResponding() throws Exception
+
+   public void testKeepAliveWithClientNotResponding() throws Throwable
    {
-      KeepAliveFactory factory = new ClientKeepAliveFactoryNotResponding();
+      final KeepAliveHandler factory = new ClientKeepAliveFactoryNotResponding();
 
       final long[] clientSessionIDNotResponding = new long[1];
       final CountDownLatch latch = new CountDownLatch(1);
 
-      RemotingSessionListener listener = new RemotingSessionListener() {
+      RemotingSessionListener listener = new RemotingSessionListener()
+      {
          public void sessionDestroyed(long sessionID, MessagingException me)
          {
             clientSessionIDNotResponding[0] = sessionID;
             latch.countDown();
          }
       };
-      service.addRemotingSessionListener(listener);
-      
-      MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), factory);
+      messagingServer.getRemotingService().addRemotingSessionListener(listener);
+      ConnectionParams connectionParams = new ConnectionParamsImpl();
+      connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+      connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
 
+      LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+      MinaConnector connector = new MinaConnector(location, connectionParams, new PacketDispatcherImpl(null), factory);
+
       NIOSession session = connector.connect();
+      RemotingConnection remotingConnection =  new RemotingConnectionImpl(location, connectionParams, connector);
+      createConnection(messagingServer, remotingConnection);
       long clientSessionID = session.getID();
 
       boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
-            + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+              + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
       assertTrue("notification has not been received", firedKeepAliveNotification);
       assertNotNull(clientSessionIDNotResponding[0]);
       assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
 
-      service.removeRemotingSessionListener(listener);
+      messagingServer.getRemotingService().removeRemotingSessionListener(listener);
       connector.disconnect();
    }
 
-   public void testKeepAliveWithClientTooLongToRespond() throws Exception
+   public void testKeepAliveWithClientTooLongToRespond() throws Throwable
    {
-      KeepAliveFactory factory = new KeepAliveFactory()
+      KeepAliveHandler factory = new KeepAliveHandler()
       {
-         public Ping ping(long sessionID)
+         public boolean isAlive(Ping ping, Pong pong)
          {
-            return null;
+            return false;  //todo
          }
-         
-         public boolean isPing(long sessionID, Object message)
+
+         public void handleDeath(long sessionId)
          {
-            return (message instanceof Ping);
+            //todo
          }
 
-         public synchronized Pong pong(long sessionID, Ping ping)
+         public Pong ping(Ping pong)
          {
-            // like a TCP timeout, there is no response in the next 2 hours
             try
             {
                wait(2 * 3600);
-            } catch (InterruptedException e)
+            }
+            catch (InterruptedException e)
             {
                e.printStackTrace();
             }
             return new Pong(randomLong(), false);
-         }         
+         }
       };
 
       try
       {
-         MinaConnector connector = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT),
-               new PacketDispatcherImpl(null), factory);
+         ConnectionParams connectionParams = new ConnectionParamsImpl();
+         connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+         connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+         LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+         MinaConnector connector = new MinaConnector(location, connectionParams,
+                 new PacketDispatcherImpl(null), factory);
 
          NIOSession session = connector.connect();
+         //create a connection properly to initiate ping
+         RemotingConnection remotingConnection =  new RemotingConnectionImpl(location, connectionParams, connector);
+         createConnection(messagingServer, remotingConnection);
          long clientSessionID = session.getID();
 
          final AtomicLong clientSessionIDNotResponding = new AtomicLong(-1);
          final CountDownLatch latch = new CountDownLatch(1);
 
-         RemotingSessionListener listener = new RemotingSessionListener() {
+         RemotingSessionListener listener = new RemotingSessionListener()
+         {
             public void sessionDestroyed(long sessionID, MessagingException me)
             {
                clientSessionIDNotResponding.set(sessionID);
                latch.countDown();
             }
          };
-         service.addRemotingSessionListener(listener);
+         messagingServer.getRemotingService().addRemotingSessionListener(listener);
 
          boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
-               + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+                 + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
          assertTrue("notification has not been received", firedKeepAliveNotification);
          assertEquals(clientSessionID, clientSessionIDNotResponding.longValue());
 
-         service.removeRemotingSessionListener(listener);
+         messagingServer.getRemotingService().removeRemotingSessionListener(listener);
          connector.disconnect();
 
-      } finally
+      }
+      finally
       {
          // test is done: wake up the factory
          synchronized (factory)
@@ -206,65 +224,77 @@
    }
 
    public void testKeepAliveWithClientRespondingAndClientNotResponding()
-         throws Exception
+           throws Throwable
    {
-      KeepAliveFactory notRespondingfactory = new ClientKeepAliveFactoryNotResponding();
-      KeepAliveFactory respondingfactory = new ClientKeepAliveFactory();
+      KeepAliveHandler notRespondingfactory = new ClientKeepAliveFactoryNotResponding();
+      KeepAliveHandler respondingfactory = new ClientKeepAliveHandler();
 
       final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
       final CountDownLatch latch = new CountDownLatch(1);
 
-      RemotingSessionListener listener = new RemotingSessionListener() {
+      RemotingSessionListener listener = new RemotingSessionListener()
+      {
          public void sessionDestroyed(long sessionID, MessagingException me)
          {
             sessionIDNotResponding.set(sessionID);
             latch.countDown();
          }
       };
-      service.addRemotingSessionListener(listener);
-      
-      MinaConnector connectorNotResponding = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), notRespondingfactory);
-      MinaConnector connectorResponding = new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), new PacketDispatcherImpl(null), respondingfactory);
+      messagingServer.getRemotingService().addRemotingSessionListener(listener);
+      ConnectionParams connectionParams = new ConnectionParamsImpl();
+      connectionParams.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+      connectionParams.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+      LocationImpl location = new LocationImpl(TCP, "localhost", TestSupport.PORT);
+      MinaConnector connectorNotResponding = new MinaConnector(location, new PacketDispatcherImpl(null), notRespondingfactory);
+      MinaConnector connectorResponding = new MinaConnector(location, new PacketDispatcherImpl(null), respondingfactory);
 
       NIOSession sessionNotResponding = connectorNotResponding.connect();
+      //create a connection properly to initiate ping
+      RemotingConnection remotingConnection =  new RemotingConnectionImpl(location, connectionParams, connectorNotResponding);
+         createConnection(messagingServer, remotingConnection);
       long clientSessionIDNotResponding = sessionNotResponding.getID();
 
-      
+
       NIOSession sessionResponding = connectorResponding.connect();
+      RemotingConnection remotingConnection2 =  new RemotingConnectionImpl(location, connectionParams, connectorNotResponding);
+         createConnection(messagingServer, remotingConnection2);
       long clientSessionIDResponding = sessionResponding.getID();
 
       boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
-            + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+              + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
       assertTrue("notification has not been received", firedKeepAliveNotification);
 
       assertEquals(clientSessionIDNotResponding, sessionIDNotResponding.longValue());
       assertNotSame(clientSessionIDResponding, sessionIDNotResponding.longValue());
 
-      service.removeRemotingSessionListener(listener);
+      messagingServer.getRemotingService().removeRemotingSessionListener(listener);
       connectorNotResponding.disconnect();
       connectorResponding.disconnect();
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
+   private void createConnection(MessagingServer server, RemotingConnection remotingConnection) throws Throwable
+   {
+      long sessionID = remotingConnection.getSessionID();
+
+      CreateConnectionRequest request =
+              new CreateConnectionRequest(server.getVersion().getIncrementingVersion(), sessionID, null, null);
+
+      CreateConnectionResponse response =
+              (CreateConnectionResponse) remotingConnection.sendBlocking(0, 0, request);
+   }
    // Inner classes -------------------------------------------------
-   
-   private class ClientKeepAliveFactoryNotResponding extends ClientKeepAliveFactory
+
+   private class ClientKeepAliveFactoryNotResponding extends ClientKeepAliveHandler
    {
-      @Override
-      public Ping ping(long clientSessionID)
+      public Pong ping(Ping ping)
       {
          return null;
       }
-
-      @Override
-      public Pong pong(long sessionID, Ping ping)
-      {
-         return null;
-      }
    }
 }
\ No newline at end of file

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -28,9 +28,6 @@
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 
 /**

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaKeepAliveFactoryTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -1,100 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.tests.integration.core.remoting.mina;
-
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import junit.framework.TestCase;
-
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.KeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.MinaKeepAliveFactory;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class MinaKeepAliveFactoryTest extends TestCase
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testResponseIsNotAPong()
-   {
-      IoSession session = createMock(IoSession.class);
-      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
-      CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
-      replay(session, factory, notifier);
-
-      MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
-      assertFalse(minaFactory.isResponse(session, new Object()));
-      
-      verify(session, factory, notifier);
-   }
-
-   public void testResponseIsAPongWithSessionNotFailed()
-   {
-      IoSession session = createMock(IoSession.class);
-      long sessionID = randomLong();
-      Pong pong = new Pong(sessionID, false);
-      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
-      CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
-      replay(session, factory, notifier);
-
-      MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
-      assertTrue(minaFactory.isResponse(session, pong));
-
-      verify(session, factory, notifier);
-   }
-
-   public void testResponseIsAPongWithSessionFailed()
-   {
-      IoSession session = createMock(IoSession.class);
-      long sessionID = randomLong();
-      expect(session.getId()).andStubReturn(sessionID);
-      Pong pong = new Pong(sessionID, true);
-      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
-      CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
-      notifier.fireCleanup(anyLong(), isA(MessagingException.class));
-      expectLastCall().once();
-      replay(session, factory, notifier);
-      
-      MinaKeepAliveFactory minaFactory = new MinaKeepAliveFactory(factory, notifier);
-
-      assertTrue(minaFactory.isResponse(session, pong));
-
-      verify(session, factory, notifier);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -14,15 +14,20 @@
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.RemotingConnection;
+import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
-import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
 import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
 
 /**
@@ -37,7 +42,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private MinaService service;
+   private MessagingServer messagingServer;
 
    // Static --------------------------------------------------------
 
@@ -53,93 +58,25 @@
    @Override
    protected void tearDown() throws Exception
    {
-      service.stop();
-      service = null;
+      messagingServer.stop();
+      messagingServer = null;
    }
 
-   public void testKeepAliveWithServerNotResponding() throws Exception
+   public void testKeepAliveWithServerNotResponding() throws Throwable
    {
-      ServerKeepAliveFactory factory = new ServerKeepAliveFactory()
-      {
-         // server does not send ping
-         @Override
-         public Ping ping(long sessionID)
-         {
-            return null;
-         }
-
-         @Override
-         public Pong pong(long sessionID, Ping ping)
-         {
-            // no pong -> server is not responding
-            super.pong(sessionID, ping);
-            return null;
-         }
-      };
-
+      //set the server timeouts to be twice that of the server to force failure
       ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration(
             "localhost", TestSupport.PORT);
-      config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
-      config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
-      service = new MinaService(config, factory);
-      service.start();
-
-      MinaConnector connector = new MinaConnector(service
-            .getConfiguration().getLocation(), service.getConfiguration().getConnectionParams(), new PacketDispatcherImpl(null));
-
-      final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      RemotingSessionListener listener = new RemotingSessionListener()
-      {
-         public void sessionDestroyed(long sessionID, MessagingException me)
-         {
-            sessionIDNotResponding.set(sessionID);
-            latch.countDown();
-         }
-      };
-      connector.addSessionListener(listener);
-
-      NIOSession session = connector.connect();
-
-      boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
-            + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
-      assertTrue(firedKeepAliveNotification);
-      assertEquals(session.getID(), sessionIDNotResponding.longValue());
-
-      connector.removeSessionListener(listener);
-      connector.disconnect();
-   }
-
-   public void testKeepAliveWithServerSessionFailed() throws Exception
-   {
-      ServerKeepAliveFactory factory = new ServerKeepAliveFactory()
-      {
-         // server does not send ping
-         @Override
-         public Ping ping(long sessionID)
-         {
-            return null;
-         }
-
-         @Override
-         public Pong pong(long sessionID, Ping ping)
-         {
-            // no pong -> server is not responding
-            super.pong(sessionID, ping);
-            return new Pong(sessionID, true);
-         }
-      };
-
-      ConfigurationImpl config = ConfigurationHelper.newTCPConfiguration(
+      config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL * 2);
+      config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT * 2);
+      ConfigurationImpl clientConfig = ConfigurationHelper.newTCPConfiguration(
             "localhost", TestSupport.PORT);
-      config.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
-      config.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
-      service = new MinaService(config, factory);
-      service.start();
+      clientConfig.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
+      clientConfig.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
+      messagingServer = new MessagingServerImpl(config);
+      messagingServer.start();
 
-      MinaConnector connector = new MinaConnector(service
-            .getConfiguration().getLocation(), new PacketDispatcherImpl(null));
+      MinaConnector connector = new MinaConnector(clientConfig.getLocation(), clientConfig.getConnectionParams(), new PacketDispatcherImpl(null));
 
       final AtomicLong sessionIDNotResponding = new AtomicLong(-1);
       final CountDownLatch latch = new CountDownLatch(1);
@@ -155,7 +92,8 @@
       connector.addSessionListener(listener);
 
       NIOSession session = connector.connect();
-
+      RemotingConnection remotingConnection =  new RemotingConnectionImpl(config.getLocation(), config.getConnectionParams(), connector);
+      createConnection(messagingServer, remotingConnection);
       boolean firedKeepAliveNotification = latch.await(TestSupport.KEEP_ALIVE_INTERVAL
             + TestSupport.KEEP_ALIVE_TIMEOUT + 2, SECONDS);
       assertTrue(firedKeepAliveNotification);
@@ -165,11 +103,21 @@
       connector.disconnect();
    }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+   private void createConnection(MessagingServer server, RemotingConnection remotingConnection) throws Throwable
+   {
+      long sessionID = remotingConnection.getSessionID();
 
+      CreateConnectionRequest request =
+              new CreateConnectionRequest(server.getVersion().getIncrementingVersion(), sessionID, null, null);
+
+      CreateConnectionResponse response =
+              (CreateConnectionResponse) remotingConnection.sendBlocking(0, 0, request);
+   }
    // Inner classes -------------------------------------------------
 }
\ No newline at end of file

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java	2008-05-22 09:39:31 UTC (rev 4281)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/network/ClientNetworkFailureTest.java	2008-05-22 12:29:03 UTC (rev 4282)
@@ -130,6 +130,8 @@
       boolean gotExceptionsOnTheServerAndTheClient = exceptionLatch.await(
             KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
       assertTrue(gotExceptionsOnTheServerAndTheClient);
+      //now we  need to wait for the server to detect the client failover
+      Thread.sleep((KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT) * 1000);
       assertActiveConnectionsOnTheServer(0);
 
       try
@@ -163,6 +165,8 @@
       boolean gotExceptionOnTheServer = exceptionLatch.await(
             KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT + 5, SECONDS);
       assertTrue(gotExceptionOnTheServer);
+      //now we  need to wait for the server to detect the client failover
+      Thread.sleep((KEEP_ALIVE_INTERVAL + KEEP_ALIVE_TIMEOUT) * 1000);
       assertActiveConnectionsOnTheServer(0);
 
       try




More information about the jboss-cvs-commits mailing list