[jboss-cvs] JBoss Messaging SVN: r4289 - in trunk: src/config and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 23 09:37:10 EDT 2008


Author: ataylor
Date: 2008-05-23 09:37:09 -0400 (Fri, 23 May 2008)
New Revision: 4289

Added:
   trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
Removed:
   trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
Modified:
   trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
   trunk/src/config/jbm-configuration.xml
   trunk/src/config/jbm-standalone-beans.xml
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
Log:
changed server side keepalive

Modified: trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -48,7 +48,6 @@
          Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
          ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
          connection = cf.createConnection();
-         connection2 = cf.createConnection();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer producer = session.createProducer(queue);
          Message message = session.createTextMessage("This is a text message!");
@@ -59,14 +58,6 @@
          TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
          log.info("message received from queue");
          log.info("message = " + message2.getText());
-         try
-         {
-            Thread.sleep(200000);
-         }
-         catch (InterruptedException e)
-         {
-            e.printStackTrace();
-         }
       }
       catch (NamingException e)
       {

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/config/jbm-configuration.xml	2008-05-23 13:37:09 UTC (rev 4289)
@@ -22,7 +22,7 @@
       <remoting-host>localhost</remoting-host>
 
       <!--  timeout in seconds -->
-      <remoting-timeout>5</remoting-timeout>
+      <remoting-timeout>5000</remoting-timeout>
       
       <!-- true to disable invm communication when the client and the server are in the same JVM.     -->
       <!-- it is not allowed to disable invm communication when the remoting-transport is set to INVM -->

Modified: trunk/src/config/jbm-standalone-beans.xml
===================================================================
--- trunk/src/config/jbm-standalone-beans.xml	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/config/jbm-standalone-beans.xml	2008-05-23 13:37:09 UTC (rev 4289)
@@ -45,6 +45,9 @@
       <property name="remotingService">
          <inject bean="RemotingService"/>
       </property>
+      <property name="cleanUpNotifier">
+         <inject bean="RemotingService"/>
+      </property>
       <property name="configuration">
          <inject bean="Configuration"/>
       </property>

Copied: trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java (from rev 4282, trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,8 @@
+package org.jboss.messaging.core.client;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ServerPinger extends Runnable
+{
+}

Copied: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java (from rev 4282, trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,78 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.client.ServerPinger;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.apache.mina.common.IoSession;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerPingerImpl implements PacketHandler
+{
+   private static Logger log = Logger.getLogger(ServerPingerImpl.class);
+   private static boolean traceEnabled = log.isTraceEnabled();
+   private long id;
+   KeepAliveHandler keepAliveHandler;
+
+   public ServerPingerImpl(KeepAliveHandler keepAliveHandler, Long id)
+   {
+      this.keepAliveHandler = keepAliveHandler;
+      this.id = id;
+   }
+
+   public long getID()
+   {
+      return id;
+   }
+
+   public void handle(Packet packet, PacketReturner sender)
+   {
+      Ping ping = (Ping) packet;
+      if(traceEnabled)
+      {
+         log.trace("received ping:" + ping);
+      }
+      Pong pong = keepAliveHandler.ping(ping);
+
+      if(pong != null)
+      {
+         try
+         {
+            sender.send(pong);
+         }
+         catch (Exception e)
+         {
+            log.warn("error sending pong to server", e);
+         }
+      }
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -1,99 +0,0 @@
-/*
-   * JBoss, Home of Professional Open Source
-   * Copyright 2005, JBoss Inc., and individual contributors as indicated
-   * by the @authors tag. See the copyright.txt in the distribution for a
-   * full listing of individual contributors.
-   *
-   * This is free software; you can redistribute it and/or modify it
-   * under the terms of the GNU Lesser General Public License as
-   * published by the Free Software Foundation; either version 2.1 of
-   * the License, or (at your option) any later version.
-   *
-   * This software is distributed in the hope that it will be useful,
-   * but WITHOUT ANY WARRANTY; without even the implied warranty of
-   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   * Lesser General Public License for more details.
-   *
-   * You should have received a copy of the GNU Lesser General Public
-   * License along with this software; if not, write to the Free
-   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-   */
-package org.jboss.messaging.core.client.impl;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.*;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.client.ServerPonger;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.apache.mina.common.IoSession;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class ServerPongerImpl implements PacketHandler, ServerPonger
-{
-   private static Logger log = Logger.getLogger(ServerPongerImpl.class);
-   private static boolean traceEnabled = log.isTraceEnabled();
-   IoSession session;
-   private long id;
-   long interval;
-   long timeout;
-   CleanUpNotifier cleanUpNotifier;
-   KeepAliveHandler keepAliveHandler;
-   CountDownLatch latch = new CountDownLatch(1);
-
-   public ServerPongerImpl(CleanUpNotifier cleanUpNotifier, KeepAliveHandler keepAliveHandler, IoSession session, long id, long timeout, long interval)
-   {
-      this.cleanUpNotifier = cleanUpNotifier;
-      this.keepAliveHandler = keepAliveHandler;
-      this.session = session;
-      this.id = id;
-      this.timeout = timeout;
-      this.interval = interval;
-   }
-
-
-   public void run()
-   {
-      boolean pinged;
-      latch = new CountDownLatch(1);
-      try
-      {
-         pinged = latch.await(timeout + interval, TimeUnit.MILLISECONDS);
-         if(!pinged)
-         {
-            log.warn("no ping received from server, cleaning up connection.");
-            cleanUpNotifier.fireCleanup(session.getId(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "no ping received from server"));
-         }
-      }
-      catch (InterruptedException e)
-      {
-      }
-   }
-
-   public long getID()
-   {
-      return id;
-   }
-
-   public void handle(Packet packet, PacketReturner sender)
-   {
-      Ping ping = (Ping) packet;
-      latch.countDown();
-      if(traceEnabled)
-      {
-         log.trace("received ping:" + ping);
-      }
-      Pong pong = keepAliveHandler.ping(ping);
-      if(pong != null)
-      {
-         session.write(pong);
-      }
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -46,7 +46,7 @@
    public static final String REMOTING_ENABLE_SSL_SYSPROP_KEY = "jbm.remoting.enable.ssl";
 
    public static final int DEFAULT_REMOTING_PORT = 5400;
-   public static final int DEFAULT_KEEP_ALIVE_INTERVAL = 0; // in seconds
+   public static final int DEFAULT_KEEP_ALIVE_INTERVAL = 10; // in seconds
    public static final int DEFAULT_KEEP_ALIVE_TIMEOUT = 5; // in seconds
    public static final int DEFAULT_REQRES_TIMEOUT = 5; // in seconds
    public static final boolean DEFAULT_INVM_DISABLED = false;

Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveFactory
+{
+
+   Ping ping(long sessionID);
+
+   Pong pong(long sessionID, Ping ping);
+
+   boolean isPinging(long sessionID);
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -11,6 +11,7 @@
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.ClientPinger;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -23,6 +24,8 @@
    PacketDispatcher getDispatcher();
 
    Configuration getConfiguration();
+
+   ServerKeepAliveFactory getKeepAliveFactory();
    
    void addInterceptor(Interceptor interceptor);
 
@@ -31,6 +34,4 @@
    void addRemotingSessionListener(RemotingSessionListener listener);
 
    void removeRemotingSessionListener(RemotingSessionListener listener);
-
-   void setClientPinger(ClientPinger clientPinger);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -37,7 +37,7 @@
    public Pong ping(Ping ping)
    {
       Pong pong = new Pong(ping.getSessionID(), false);
-      pong.setTargetID(0);
+      pong.setTargetID(ping.getResponseTargetID());
       return pong;
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -27,9 +27,8 @@
 import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.ServerPonger;
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
-import org.jboss.messaging.core.client.impl.ServerPongerImpl;
+import org.jboss.messaging.core.client.impl.ServerPingerImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.*;
@@ -70,9 +69,7 @@
 
    KeepAliveHandler keepAliveHandler;
 
-   private ScheduledExecutorService scheduledExecutor;
 
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -172,14 +169,15 @@
       }
       session = future.getSession();
 
-      ServerPongerImpl pinger = new ServerPongerImpl(this, keepAliveHandler, session, 0, connectionParams.getKeepAliveTimeout() * 1000, connectionParams.getKeepAliveInterval() * 1000);
-
+      ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
+      /*
       getDispatcher().register(pinger);
       if (connectionParams.getKeepAliveInterval() > 0)
       {
          scheduledExecutor = new ScheduledThreadPoolExecutor(1);
          scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
-      }
+      }*/
+      dispatcher.register(pinger);
       return new MinaSession(session, handler);
    }
 
@@ -189,7 +187,6 @@
       {
          return false;
       }
-
       CloseFuture closeFuture = session.close().awaitUninterruptibly();
       boolean closed = closeFuture.isClosed();
 
@@ -215,10 +212,7 @@
 
       connector = null;
       session = null;
-      if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
-      {
-         scheduledExecutor.shutdown();
-      }
+
       return closed;
    }
 
@@ -262,10 +256,6 @@
 
    public synchronized void fireCleanup(long sessionID, MessagingException me)
    {
-      if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
-      {
-         scheduledExecutor.shutdown();
-      }
       for (RemotingSessionListener listener: listeners)
       {
          listener.sessionDestroyed(sessionID, me);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -67,7 +67,7 @@
 
    private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
 
-   private ClientPinger clientPinger;
+   private ServerKeepAliveFactory factory;
 
    // Static --------------------------------------------------------
 
@@ -75,11 +75,18 @@
 
    public MinaService(Configuration config)
    {
+      this(config, new ServerKeepAliveFactory());
+   }
+
+   public MinaService(Configuration config, ServerKeepAliveFactory factory)
+   {
       assert config != null;
+      assert factory != null;
 
       validate(config);
 
       this.config = config;
+      this.factory = factory;
       dispatcher = new PacketDispatcherImpl(filters);
    }
 
@@ -109,12 +116,6 @@
       listeners.remove(listener);
    }
 
-   public void setClientPinger(ClientPinger clientPinger)
-   {
-      this.clientPinger = clientPinger;
-      clientPinger.registerCleanUpNotifier(this);
-   }
-
    // TransportService implementation -------------------------------
 
    public void start() throws Exception
@@ -213,6 +214,11 @@
       return config;
    }
 
+   public ServerKeepAliveFactory getKeepAliveFactory()
+   {
+      return factory;
+   }
+
    /**
     * This method must only be called by tests which requires
     * to insert Filters (e.g. to simulate network failures)
@@ -229,15 +235,28 @@
 
    public void fireCleanup(long sessionID, MessagingException me)
    {
-      for (RemotingSessionListener listener : listeners)
+      if (factory.getSessions().contains(sessionID))
       {
-         listener.sessionDestroyed(sessionID, me);
+         for (RemotingSessionListener listener : listeners)
+         {
+            listener.sessionDestroyed(sessionID, me);
+         }
+         factory.getSessions().remove(sessionID);
       }
    }
 
    // Public --------------------------------------------------------
 
+   public void setKeepAliveFactory(ServerKeepAliveFactory factory)
+   {
+      assert factory != null;
 
+      this.factory = factory;
+   }
+
+   // Public --------------------------------------------------------
+
+
    public void setRemotingConfiguration(Configuration remotingConfig)
    {
       assert started == false;

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ServerKeepAliveFactory implements KeepAliveFactory
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger
+         .getLogger(ServerKeepAliveFactory.class);
+
+   // Attributes ----------------------------------------------------
+
+   // FIXME session mapping must be cleaned when the server session is closed:
+   // either normally or exceptionally
+   /**
+    * Key = server session ID Value = client session ID
+    */
+   private List<Long> sessions = new ArrayList<Long>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // KeepAliveFactory implementation -------------------------------
+
+   public Ping ping(long sessionID)
+   {
+      return new Ping(sessionID);
+   }
+
+   public boolean isPinging(long sessionID)
+   {
+      return sessions.contains(sessionID);
+   }
+
+   public Pong pong(long sessionID, Ping ping)
+   {
+      long clientSessionID = ping.getSessionID();
+      return new Pong(sessionID, sessions.contains(clientSessionID));
+   }
+
+   public List<Long> getSessions()
+   {
+      return sessions;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -15,31 +15,5 @@
     */
    void run();
 
-   /**
-    * pong received from client
-    * @param pong the pong
-    */
-   void pong(Pong pong);
 
-   /**
-    * register a connection.
-    *
-    * @param remotingSessionID  the session id
-    * @param sender the sender
-    */
-   void registerConnection(long remotingSessionID, PacketReturner sender);
-
-   /**
-    * unregister a connection.
-    *
-    * @param remotingSessionID the session id
-    */
-   void unregister(long remotingSessionID);
-
-   /**
-    * register the cleanup notifier to use
-    *
-    * @param cleanUpNotifier the notifier
-    */
-   void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
@@ -91,7 +92,8 @@
    
    CreateConnectionResponse createConnection(String username, String password,
                                              long remotingClientSessionID, String clientAddress,
-                                             int incrementVersion) throws Exception;
+                                             int incrementVersion,
+                                             PacketReturner sender) throws Exception;
 
    DeploymentManager getDeploymentManager();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -25,6 +25,9 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
 import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
 import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.ServerConnection;
@@ -40,7 +43,7 @@
 /**
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
-public class ClientPingerImpl implements ClientPinger
+public class ClientPingerImpl implements ClientPinger, PacketHandler
 {
    private static Logger log = Logger.getLogger(ClientPingerImpl.class);
 
@@ -61,122 +64,76 @@
     * the cleanupnotifier to use on failed pings
     */
    private CleanUpNotifier cleanUpNotifier;
+   private KeepAliveFactory keepAliveFactory;
+   private PacketReturner sender;
+   long id = 0;
+   private Pong pong = null;
 
-   public ClientPingerImpl(MessagingServer server)
+   public ClientPingerImpl(MessagingServer server, KeepAliveFactory keepAliveFactory, CleanUpNotifier cleanUpNotifier, final PacketReturner sender)
    {
       this.server = server;
+      this.keepAliveFactory = keepAliveFactory;
+      this.cleanUpNotifier = cleanUpNotifier;
+      this.sender = sender;
    }
 
    public void run()
    {
-      try
+      id = server.getRemotingService().getDispatcher().generateID();
+      server.getRemotingService().getDispatcher().register(this);
+      Ping ping = keepAliveFactory.ping(sender.getSessionID());
+      ping.setTargetID(0);
+      ping.setResponseTargetID(id);
+      while(keepAliveFactory.isPinging(sender.getSessionID()))
       {
          synchronized (this)
          {
-            replies.clear();
-            //ping all the sessions
-            for (Long sessionId : connections.keySet())
+            try
             {
-               try
-               {
-                  Ping ping = new Ping(sessionId);
-                  ping.setTargetID(0);
-                  connections.get(sessionId).getPacketReturner().send(ping);
-                  replies.add(sessionId);
-                  if(isTraceEnabled)
-                  {
-                     log.trace("sending " + ping);
-                  }
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace();
-               }
+               wait(server.getConfiguration().getKeepAliveInterval() * 1000);
             }
-            //wait for the keep alive timeout period
-            try
+            catch (InterruptedException e)
             {
+            }
+         }
+         pong = null;
+         try
+         {
+            sender.send(ping);
+            synchronized (this)
+            {
                wait(server.getConfiguration().getKeepAliveTimeout() * 1000);
             }
-            catch (InterruptedException e)
+            if(pong == null)
             {
+               cleanUpNotifier.fireCleanup(sender.getSessionID(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
+               break;
             }
          }
-         //at this point cleanup any replies we havent received
-         for (Long reply : replies)
+         catch (Exception e)
          {
-            if(cleanUpNotifier != null)
-               cleanUpNotifier.fireCleanup(reply, new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
-            connections.remove(reply);
+            log.warn("problem cleaning up session: " + sender.getSessionID(), e);
          }
       }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
+      server.getRemotingService().getDispatcher().unregister(id);
    }
 
-   /**
-    * pong received from client
-    * @param pong
-    */
-   public void pong(Pong pong)
+   public long getID()
    {
-      if(isTraceEnabled)
-      {
-         log.trace("received reply" + pong);
-      }
-      replies.remove(pong.getSessionID());
+      return id;
    }
 
-   /**
-    * register a connection.
-    *
-    * @param remotingSessionID
-    * @param sender
-    */
-   public void registerConnection(long remotingSessionID, PacketReturner sender)
+   public void handle(Packet packet, PacketReturner sender)
    {
-      if (connections.get(remotingSessionID) == null)
+      Pong pong = (Pong) packet;
+      if(isTraceEnabled)
       {
-         connections.put(remotingSessionID, new ConnectionHolder(remotingSessionID, sender));
+         log.trace("received reply" + pong);
       }
-      else
-      {
-         connections.get(remotingSessionID).increment();
-      }
-
+      this.pong = pong;
    }
 
    /**
-    * unregister a connection.
-    *
-    * @param remotingSessionID
-    */
-   public void unregister(long remotingSessionID)
-   {
-      ConnectionHolder connectionHolder = connections.get(remotingSessionID);
-      if(connectionHolder != null)
-      {
-         connectionHolder.decrement();
-         if(connectionHolder.get() == 0)
-         {
-            connections.remove(remotingSessionID);
-         }
-      }
-   }
-
-   /**
-    * register the cleanup notifier to use
-    *
-    * @param cleanUpNotifier
-    */
-   public void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
-   {
-      this.cleanUpNotifier = cleanUpNotifier;
-   }
-
-   /**
     * simple holder class for sessions
     */
    class ConnectionHolder

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -40,12 +40,12 @@
 import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.remoting.ConnectorRegistrySingleton;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
 import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
@@ -96,7 +96,6 @@
    private Deployer queueSettingsDeployer;
    private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
    private DeploymentManager deploymentManager = new FileDeploymentManager();
-   private ClientPinger clientPinger;
 
    // plugins
 
@@ -112,6 +111,7 @@
    private ResourceManager resourceManager = new ResourceManagerImpl(0);
    private ScheduledExecutorService scheduledExecutor;
    private MessagingServerPacketHandler serverPacketHandler;
+   private CleanUpNotifier cleanUpNotifier = null;
 
    // Constructors ---------------------------------------------------------------------------------
    /**
@@ -137,6 +137,7 @@
       this.configuration = configuration;
       createTransport = true;
       remotingService = new MinaService(configuration);
+      cleanUpNotifier = (CleanUpNotifier) remotingService;
    }
    // lifecycle methods ----------------------------------------------------------------
 
@@ -172,8 +173,6 @@
       }
       // Start the wired components
       securityDeployer.start();
-      clientPinger = new ClientPingerImpl(this);
-      remotingService.setClientPinger(clientPinger);
       remotingService.addRemotingSessionListener(connectionManager);
       memoryManager.start();
       deploymentManager.start(1);
@@ -181,9 +180,8 @@
       deploymentManager.registerDeployer(queueSettingsDeployer);
       postOffice.start();
       deploymentManager.start(2);
-      serverPacketHandler = new MessagingServerPacketHandler(this, clientPinger);
+      serverPacketHandler = new MessagingServerPacketHandler(this);
       getRemotingService().getDispatcher().register(serverPacketHandler);
-      serverPacketHandler.start();
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       for (String interceptorClass : configuration.getDefaultInterceptors())
       {
@@ -217,7 +215,6 @@
       queueSettingsDeployer.stop();
       deploymentManager.stop();
       remotingService.removeRemotingSessionListener(connectionManager);
-      serverPacketHandler.stop();
       connectionManager = null;
       memoryManager.stop();
       memoryManager = null;
@@ -283,7 +280,12 @@
    {
       this.storageManager = storageManager;
    }
-   
+
+   public void setCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
+   {
+      this.cleanUpNotifier = cleanUpNotifier;
+   }
+
    public PostOffice getPostOffice()
    {
       return postOffice;
@@ -322,7 +324,8 @@
 
    public CreateConnectionResponse createConnection(final String username, final String password,
                                                     final long remotingClientSessionID, final String clientAddress,
-                                                    final int incrementVersion)
+                                                    final int incrementVersion,
+                                                    final PacketReturner sender)
       throws Exception
    {
       log.trace("creating a new connection for user " + username);
@@ -348,9 +351,20 @@
                           queueSettingsRepository,
                           postOffice, securityStore, connectionManager);
 
-      remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection, clientPinger));
+      remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
 
-      return new CreateConnectionResponse(connection.getID(), version);
+      CreateConnectionResponse createConnectionResponse = new CreateConnectionResponse(connection.getID(), version);
+      if(cleanUpNotifier != null)
+      {
+         if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
+         {
+            getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
+            ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
+            new Thread(clientPinger).start();
+         }
+      }
+
+      return createConnectionResponse;
    }
    
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -43,39 +43,20 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
-public class MessagingServerPacketHandler extends ServerPacketHandlerSupport implements MessagingComponent
+public class MessagingServerPacketHandler extends ServerPacketHandlerSupport 
 {
    private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
    
    private final MessagingServer server;
 
-   private final ClientPinger clientPinger;
 
    private ScheduledExecutorService scheduledExecutor;
 
-   public MessagingServerPacketHandler(final MessagingServer server, ClientPinger clientPinger)
+   public MessagingServerPacketHandler(final MessagingServer server)
    {
       this.server = server;
-      this.clientPinger = clientPinger;
 
    }
-
-   public void start() throws Exception
-   {
-      if (server.getConfiguration().getKeepAliveInterval() > 0)
-      {
-         scheduledExecutor = new ScheduledThreadPoolExecutor(1);
-         scheduledExecutor.scheduleAtFixedRate(clientPinger, 0, server.getConfiguration().getKeepAliveInterval(), TimeUnit.SECONDS);
-      }
-   }
-
-   public void stop() throws Exception
-   {
-      if (server.getConfiguration().getKeepAliveInterval() > 0)
-      {
-         scheduledExecutor.shutdownNow();
-      }
-   }
    /*
    * The advantage to use String as ID is that we can leverage Java 5 UUID to
    * generate these IDs. However theses IDs are 128 bite long and it increases
@@ -103,15 +84,14 @@
          CreateConnectionResponse  createConnectionResponse = server.createConnection(request.getUsername(), request.getPassword(),
          		                             request.getRemotingSessionID(),
                                             sender.getRemoteAddress(),
-                                            request.getVersion());
-         clientPinger.registerConnection(request.getRemotingSessionID(), sender);
+                                            request.getVersion(),
+                                            sender);
          response = createConnectionResponse;
          
       }
       else if(type == EmptyPacket.PONG)
       {
          Pong decodedPong = (Pong) packet;
-         clientPinger.pong(decodedPong);
       }
       else
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-23 13:37:09 UTC (rev 4289)
@@ -40,12 +40,10 @@
 public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
 {
 	private final ServerConnection connection;
-   final ClientPinger clientPinger;
 	
-   public ServerConnectionPacketHandler(final ServerConnection connection, final ClientPinger clientPinger)
+   public ServerConnectionPacketHandler(final ServerConnection connection)
    {
    	this.connection = connection;
-      this.clientPinger = clientPinger;
    }
 
    public long getID()
@@ -72,7 +70,7 @@
          connection.stop();
          break;
       case EmptyPacket.CLOSE:
-         clientPinger.unregister(connection.getRemotingClientSessionID());
+         //clientPinger.unregister(connection.getRemotingClientSessionID());
          connection.close();
          break;
       default:




More information about the jboss-cvs-commits mailing list