[jboss-cvs] JBoss Messaging SVN: r7332 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 15 06:36:19 EDT 2009


Author: timfox
Date: 2009-06-15 06:36:18 -0400 (Mon, 15 Jun 2009)
New Revision: 7332

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1656

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -585,10 +585,6 @@
     */
    private void sendCredits(final int credits)
    {
-      if (trace)
-      {
-         log.trace("Sending " + credits + " credits back", new Exception ("trace"));
-      }
       channel.send(new SessionConsumerFlowCreditMessage(id, credits));
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.client.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -90,9 +89,9 @@
    // Attributes
    // -----------------------------------------------------------------------------------
 
-   //We need to keep the reference to prevent the factory getting gc'd before the sessions are finished being used
+   // We need to keep the reference to prevent the factory getting gc'd before the sessions are finished being used
    private final ClientSessionFactory factory;
-      
+
    private final TransportConfiguration connectorConfig;
 
    private final TransportConfiguration backupConfig;
@@ -151,10 +150,8 @@
 
    private Connector connector;
 
-   private Map<Object, FailedConnectionRunnable> failRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
+   private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
 
-   private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
-
    // debug
 
    private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
@@ -189,7 +186,7 @@
                                 final ScheduledExecutorService scheduledThreadPool)
    {
       this.factory = factory;
-      
+
       this.connectorConfig = connectorConfig;
 
       this.backupConfig = backupConfig;
@@ -460,24 +457,23 @@
    {
       closed = true;
    }
-      
 
    // Public
    // ---------------------------------------------------------------------------------------
 
    public void cancelPingerForConnectionID(final Object connectionID)
    {
-      Pinger pinger = pingRunnables.get(connectionID);
+      Pinger pinger = pingers.get(connectionID);
 
       pinger.close();
    }
-   
+
    @Override
    protected void finalize() throws Throwable
    {
-      //In case user forgets to close it explicitly
+      // In case user forgets to close it explicitly
       close();
-      
+
       super.finalize();
    }
 
@@ -496,7 +492,7 @@
    }
 
    private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
-   {
+   {     
       // To prevent recursion
       if (inFailoverOrReconnect)
       {
@@ -543,7 +539,7 @@
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
 
          boolean done = false;
-
+         
          if (attemptFailover || reconnectAttempts != 0)
          {
             lockAllChannel1s();
@@ -592,9 +588,9 @@
             {
                oldConnections.add(entry.connection);
             }
+            
+            closePingers();
 
-            closeScheduledRunnables();
-
             connections.clear();
 
             refCount = 0;
@@ -643,6 +639,7 @@
             else
             {
                // Fail the old connections so their listeners get called
+               
                for (RemotingConnection connection : oldConnections)
                {
                   connection.fail(me);
@@ -652,7 +649,9 @@
          else
          {
             // Just fail the connections
-
+            
+            closePingers();
+ 
             failConnection(me);
          }
 
@@ -662,28 +661,16 @@
       }
    }
 
-   private void closeScheduledRunnables()
+   private void closePingers()
    {
-      for (Object id : new HashSet<Object>(connections.keySet()))
+      for (Pinger pinger: pingers.values())
       {
-         connections.remove(id);
-
-         FailedConnectionRunnable runnable = failRunnables.remove(id);
-
-         if (runnable != null)
-         {
-            runnable.close();
-         }
-
-         Pinger pingRunnable = pingRunnables.remove(id);
-
-         if (pingRunnable != null)
-         {
-            pingRunnable.close();
-         }
+         pinger.close();         
       }
+      
+      pingers.clear();
    }
-
+    
    /*
     * Re-attach sessions all pre-existing sessions to new remoting connections
     */
@@ -832,7 +819,7 @@
 
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
 
-         closeScheduledRunnables();
+         closePingers();
 
          connections.clear();
 
@@ -951,37 +938,24 @@
          // Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
          // the server needs this in order to do pinging and failure checking
 
+         Pinger pinger = new Pinger(conn, clientFailureCheckPeriod, new Channel0Handler(conn), new FailedConnectionAction(conn), 0);
+         
+         pingers.put(conn.getID(), pinger);
+         
          Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
 
          Channel channel0 = conn.getChannel(0, -1, false);
 
-         channel0.setHandler(new Channel0Handler(conn));
-
          channel0.send(ping);
 
          if (clientFailureCheckPeriod != -1)
          {
-            Pinger pinger = new Pinger(conn);
-
             Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger,
-                                                                             connectionTTL / 2,
-                                                                             connectionTTL / 2,
+                                                                             clientFailureCheckPeriod,
+                                                                             clientFailureCheckPeriod,
                                                                              TimeUnit.MILLISECONDS);
 
-            pinger.setFuture(pingerFuture);
-
-            pingRunnables.put(conn.getID(), pinger);
-
-            FailedConnectionRunnable fcRunnable = new FailedConnectionRunnable(conn);
-
-            Future<?> fcFuture = scheduledThreadPool.scheduleAtFixedRate(fcRunnable,
-                                                                         clientFailureCheckPeriod,
-                                                                         clientFailureCheckPeriod,
-                                                                         TimeUnit.MILLISECONDS);
-
-            fcRunnable.setFuture(fcFuture);
-
-            failRunnables.put(conn.getID(), fcRunnable);
+            pinger.setFuture(pingerFuture);            
          }
 
          if (debug)
@@ -1008,8 +982,6 @@
       return conn;
    }
 
-   
-
    private void returnConnection(final Object connectionID)
    {
       ConnectionEntry entry = connections.get(connectionID);
@@ -1093,7 +1065,7 @@
    }
 
    private void failConnection(final Object connectionID, final MessagingException me)
-   {
+   {            
       ConnectionEntry entry = connections.get(connectionID);
 
       if (entry != null)
@@ -1103,7 +1075,7 @@
          conn.fail(me);
       }
    }
-   
+
    private class Channel0Handler implements ChannelHandler
    {
       private final RemotingConnection conn;
@@ -1117,12 +1089,8 @@
       {
          final byte type = packet.getType();
 
-         if (type == PING)
+         if (type == PacketImpl.DISCONNECT)
          {
-            // Do nothing
-         }
-         else if (type == PacketImpl.DISCONNECT)
-         {
             threadPool.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1141,57 +1109,29 @@
       }
    }
 
-   private class FailedConnectionRunnable implements Runnable
+   private class FailedConnectionAction implements Runnable
    {
-      private boolean closed;
-
       private RemotingConnection conn;
 
-      private Future<?> future;
-
-      FailedConnectionRunnable(final RemotingConnection conn)
+      FailedConnectionAction(final RemotingConnection conn)
       {
          this.conn = conn;
       }
 
-      public synchronized void setFuture(final Future<?> future)
-      {
-         this.future = future;
-      }
-
       public synchronized void run()
       {
-         if (closed)
-         {
-            return;
-         }
+         final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                              "Did not receive data from server (or ping).");
 
-         if (!conn.isDataReceived())
+         threadPool.execute(new Runnable()
          {
-            final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                                                 "Did not receive data from server (or ping).");
-
-            threadPool.execute(new Runnable()
+            // Must be executed on different thread
+            public void run()
             {
-               // Must be executed on different thread
-               public void run()
-               {
-                  conn.fail(me);
-               }
-            });
-         }
-         else
-         {
-            conn.clearDataReceived();
-         }
+               conn.fail(me);
+            }
+         });
       }
-
-      public synchronized void close()
-      {
-         future.cancel(false);
-
-         closed = true;
-      }
    }
 
    private static class ConnectionEntry

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -33,6 +33,8 @@
    void replicatePacket(Packet packet, long replicatedChannelID, Runnable action);
    
    void setHandler(ChannelHandler handler);
+   
+   ChannelHandler getHandler();
 
    void close();
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -62,12 +62,4 @@
    void freeze();
   
    Connection getTransportConnection();
-   
-   boolean isDataReceived();
-   
-   boolean isDataSent();
-   
-   void clearDataSent();
-   
-   void clearDataReceived();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -27,7 +27,10 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 
 /**
@@ -37,58 +40,105 @@
  *
  *
  */
-public class Pinger implements Runnable
-{
+public class Pinger implements Runnable, ChannelHandler
+{   
    private static final Logger log = Logger.getLogger(Pinger.class);
    
-   private volatile boolean closed;
+   private boolean closed;
 
    private RemotingConnection conn;
 
-   private volatile Future<?> future;
+   private Future<?> future;
+   
+   private long lastPingReceived;
+   
+   private final long expiryPeriod;
+   
+   private final ChannelHandler extraHandler;
+   
+   private final Runnable connectionFailedAction;
+   
+   private final Channel channel0;
+   
+   private boolean first = true;
 
-   public Pinger(final RemotingConnection conn)
+   public Pinger(final RemotingConnection conn, final long expiryPeriod, final ChannelHandler extraHandler,
+                 final Runnable connectionFailedAction, final long lastPingReceived)
    {
       this.conn = conn;
+      
+      this.expiryPeriod = expiryPeriod;
+      
+      this.extraHandler = extraHandler;
+      
+      this.connectionFailedAction = connectionFailedAction;
+      
+      this.channel0 = conn.getChannel(0, -1, false); 
+      
+      this.lastPingReceived = lastPingReceived;
+      
+      channel0.setHandler(this);
    }
-
-   public void setFuture(final Future<?> future)
+      
+   public synchronized void setFuture(final Future<?> future)
    {
       this.future = future;
    }
-
-   public void run()
+   
+   public synchronized void handlePacket(final Packet packet)
    {
       if (closed)
       {
          return;
       }
       
-      //TODO - for now we *always* sent the ping otherwise.
-      //Checking dataSent does not work, for the following reason:
-      //If a packet is sent just after the last ping, then no ping will be sent the next time.
-      //Which means the amount of time between pings can approach 2 * ( 0.5 * client failure check period) = failure check period
-      //so, due to time taken to actually travel across network + scheduling difference the client failure checker
-      //can easily time out.
+      if (packet.getType() == PacketImpl.PING)
+      {
+         lastPingReceived = System.currentTimeMillis();
+      }
+      else if (extraHandler != null)
+      {
+         extraHandler.handlePacket(packet);
+      }
+      else
+      {
+         throw new IllegalStateException("Invalid packet " + packet.getType());
+      }
+   }
+   
+   public synchronized void run()
+   {
+      if (closed)
+      {
+         return;
+      }
       
-//      if (!conn.isDataSent())
-//      {
-         // We only send a ping if no data has been sent since last ping
-
-         Ping ping = new Ping();
-
-         Channel channel0 = conn.getChannel(0, -1, false);
-         
-         channel0.send(ping);
-    //  }
-
-      conn.clearDataSent();
+      if (!first && ( System.currentTimeMillis() - lastPingReceived > expiryPeriod))
+      {
+         connectionFailedAction.run();
+      }
+      else if (!stopPinging)
+      {      
+         channel0.send(new Ping());
+      }
+      
+      first = false;
    }
-
-   public void close()
+     
+   public synchronized void close()
    {
-      future.cancel(false);
+      if (future != null)
+      {        
+         future.cancel(false);
+      }
 
       closed = true;
    }
+   
+   private boolean stopPinging;
+   
+   public synchronized void stopPinging()
+   {
+      this.stopPinging = true;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -247,11 +247,7 @@
    private boolean frozen;
 
    private final Object failLock = new Object();
-   
-   private volatile boolean dataReceived;
-   
-   private volatile boolean dataSent;
-   
+    
    // debug only stuff
 
    private boolean createdActive;
@@ -473,8 +469,6 @@
 
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
-      dataReceived = true;           
-      
       final Packet packet = decode(buffer);
       
       synchronized (transferLock)
@@ -1060,8 +1054,6 @@
                if (connection.active || packet.isWriteAlways())
                {                  
                   connection.transportConnection.write(buffer, flush);
-                                 
-                  connection.dataSent = true;
                }
             }
             finally
@@ -1131,8 +1123,6 @@
 
                connection.transportConnection.write(buffer);
                
-               connection.dataSent = true;
-
                long toWait = connection.blockingCallTimeout;
 
                long start = System.currentTimeMillis();
@@ -1213,8 +1203,6 @@
                packet.encode(buffer);
 
                connection.transportConnection.write(buffer);
-               
-               connection.dataSent = true;
             }
          }
 
@@ -1277,6 +1265,11 @@
       {
          this.handler = handler;
       }
+      
+      public ChannelHandler getHandler()
+      {
+         return handler;
+      }
 
       public void close()
       {
@@ -1626,24 +1619,4 @@
          }
       }
    }
-
-   public boolean isDataReceived()
-   {
-      return dataReceived;
-   }
-   
-   public void clearDataReceived()
-   {
-      dataReceived = false;
-   }
-   
-   public boolean isDataSent()
-   {      
-      return dataSent;
-   }
-   
-   public void clearDataSent()
-   {     
-      dataSent = false;
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl.ssl;
 
@@ -59,41 +59,42 @@
    // Public --------------------------------------------------------
 
    public static SSLContext createServerContext(String keystorePath,
-         String keystorePassword, String trustStorePath,
-         String trustStorePassword) throws Exception
+                                                String keystorePassword,
+                                                String trustStorePath,
+                                                String trustStorePassword) throws Exception
    {
 
       // Initialize the SSLContext to work with our key managers.
       SSLContext sslContext = SSLContext.getInstance("TLS");
       KeyManager[] keyManagers = loadKeyManagers(keystorePath, keystorePassword);
-      TrustManager[] trustManagers = loadTrustManager(false, trustStorePath,
-            trustStorePassword);
+      TrustManager[] trustManagers = loadTrustManager(false, trustStorePath, trustStorePassword);
       sslContext.init(keyManagers, trustManagers, new SecureRandom());
 
       return sslContext;
    }
 
-   public static SSLContext createClientContext(String keystorePath,
-         String keystorePassword) throws Exception
+   public static SSLContext createClientContext(String keystorePath, String keystorePassword) throws Exception
    {
       SSLContext context = SSLContext.getInstance("TLS");
       KeyManager[] keyManagers = loadKeyManagers(keystorePath, keystorePassword);
-      TrustManager[] trustManagers = loadTrustManager(true,  null, null);
+      TrustManager[] trustManagers = loadTrustManager(true, null, null);
       context.init(keyManagers, trustManagers, new SecureRandom());
       return context;
    }
 
-   public static SSLContext getInstance(boolean client, String keystorePath,
-         String keystorePassword, String trustStorePath,
-         String trustStorePassword) throws GeneralSecurityException, Exception
+   public static SSLContext getInstance(boolean client,
+                                        String keystorePath,
+                                        String keystorePassword,
+                                        String trustStorePath,
+                                        String trustStorePassword) throws GeneralSecurityException, Exception
    {
       if (client)
       {
          return createClientContext(keystorePath, keystorePassword);
-      } else
+      }
+      else
       {
-         return createServerContext(keystorePath, keystorePassword,
-               trustStorePath, trustStorePassword);
+         return createServerContext(keystorePath, keystorePassword, trustStorePath, trustStorePassword);
       }
    }
 
@@ -103,8 +104,7 @@
 
    // Private -------------------------------------------------------
 
-   private static TrustManager[] loadTrustManager(boolean clientMode,
-         String trustStorePath, String trustStorePassword) throws Exception
+   private static TrustManager[] loadTrustManager(boolean clientMode, String trustStorePath, String trustStorePassword) throws Exception
    {
       if (clientMode)
       {
@@ -113,13 +113,11 @@
          // return a trust manager that trusts all certs
          return new TrustManager[] { new X509TrustManager()
          {
-            public void checkClientTrusted(X509Certificate[] chain,
-                  String authType)
+            public void checkClientTrusted(X509Certificate[] chain, String authType)
             {
             }
 
-            public void checkServerTrusted(X509Certificate[] chain,
-                  String authType)
+            public void checkServerTrusted(X509Certificate[] chain, String authType)
             {
             }
 
@@ -128,38 +126,38 @@
                return null;
             }
          } };
-      } else
+      }
+      else
       {
          TrustManagerFactory trustMgrFactory;
-         KeyStore trustStore = SSLSupport.loadKeystore(trustStorePath,
-               trustStorePassword);
-         trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory
-               .getDefaultAlgorithm());
+         KeyStore trustStore = SSLSupport.loadKeystore(trustStorePath, trustStorePassword);
+         trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
          trustMgrFactory.init(trustStore);
          return trustMgrFactory.getTrustManagers();
       }
    }
 
-   private static KeyStore loadKeystore(String keystorePath,
-         String keystorePassword) throws Exception
+   private static KeyStore loadKeystore(String keystorePath, String keystorePassword) throws Exception
    {
       assert keystorePath != null;
       assert keystorePassword != null;
-      
+
       KeyStore ks = KeyStore.getInstance("JKS");
       InputStream in = null;
       try
       {
          URL keystoreURL = validateStoreURL(keystorePath);
          ks.load(keystoreURL.openStream(), keystorePassword.toCharArray());
-      } finally
+      }
+      finally
       {
          if (in != null)
          {
             try
             {
                in.close();
-            } catch (IOException ignored)
+            }
+            catch (IOException ignored)
             {
             }
          }
@@ -167,11 +165,9 @@
       return ks;
    }
 
-   private static KeyManager[] loadKeyManagers(String keystorePath,
-         String keystorePassword) throws Exception
+   private static KeyManager[] loadKeyManagers(String keystorePath, String keystorePassword) throws Exception
    {
-      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
-            .getDefaultAlgorithm());
+      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
       KeyStore ks = loadKeystore(keystorePath, keystorePassword);
       kmf.init(ks, keystorePassword.toCharArray());
 
@@ -181,21 +177,22 @@
    private static URL validateStoreURL(String storePath) throws Exception
    {
       assert storePath != null;
-      
+
       // First see if this is a URL
       try
       {
          return new URL(storePath);
-      } catch (MalformedURLException e)
+      }
+      catch (MalformedURLException e)
       {
          File file = new File(storePath);
          if (file.exists() == true && file.isFile())
          {
             return file.toURI().toURL();
-         } else
+         }
+         else
          {
-            URL url = Thread.currentThread().getContextClassLoader()
-                  .getResource(storePath);
+            URL url = Thread.currentThread().getContextClassLoader().getResource(storePath);
             if (url != null)
                return url;
          }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -41,7 +41,6 @@
 import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
@@ -95,10 +94,8 @@
 
    private final ScheduledExecutorService scheduledThreadPool;
 
-   private Map<Object, FailedConnectionRunnable> connectionTTLRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
+   private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
 
-   private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
-
    private final int managementConnectorID;
 
    // Static --------------------------------------------------------
@@ -144,7 +141,7 @@
       {
          return;
       }
-      
+
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
 
       for (TransportConfiguration info : transportConfigs)
@@ -169,31 +166,31 @@
             log.warn("Error instantiating acceptor \"" + info.getFactoryClassName() + "\"", e);
          }
       }
-      
-      //We now create a "special" acceptor used by management to send/receive management messages - this is an invm
-      //acceptor with a -ve server id
-      //TODO this is not the best solution, management should send/receive management messages direct.
-      //Remove this code when this is implemented without having to require a special acceptor
-      //https://jira.jboss.org/jira/browse/JBMESSAGING-1649
-      
+
+      // We now create a "special" acceptor used by management to send/receive management messages - this is an invm
+      // acceptor with a -ve server id
+      // TODO this is not the best solution, management should send/receive management messages direct.
+      // Remove this code when this is implemented without having to require a special acceptor
+      // https://jira.jboss.org/jira/browse/JBMESSAGING-1649
+
       if (config.isJMXManagementEnabled())
       {
          Map<String, Object> params = new HashMap<String, Object>();
-         
+
          params.put(TransportConstants.SERVER_ID_PROP_NAME, managementConnectorID);
-         
+
          AcceptorFactory factory = new InVMAcceptorFactory();
-         
+
          Acceptor acceptor = factory.createAcceptor(params, bufferHandler, this, threadPool);
-         
+
          acceptors.add(acceptor);
-         
+
          if (managementService != null)
          {
             TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
-            
+
             managementService.registerAcceptor(acceptor, info);
-         }         
+         }
       }
 
       for (Acceptor a : acceptors)
@@ -239,16 +236,11 @@
 
       acceptors.clear();
 
-      for (FailedConnectionRunnable runnable : connectionTTLRunnables.values())
+      for (Pinger runnable : pingers.values())
       {
          runnable.close();
       }
 
-      for (Pinger runnable : pingRunnables.values())
-      {
-         runnable.close();
-      }
-
       connections.clear();
 
       started = false;
@@ -296,21 +288,14 @@
 
       channel1.setHandler(handler);
 
-      Channel channel0 = rc.getChannel(0, -1, false);
+      connections.put(connection.getID(), rc);
 
-      Channel0Handler channel0Handler = new Channel0Handler(rc);
+      InitialPingTimeout runnable = new InitialPingTimeout(rc);
 
-      channel0.setHandler(channel0Handler);
-
-      Object id = connection.getID();
-
-      connections.put(id, rc);
-
-      InitialPingTimeout runnable = new InitialPingTimeout(rc, channel0Handler);
-
       // We schedule an initial ping timeout. An inital ping is always sent from the client as the first thing it
       // does after creating a connection, this contains the ping period and connection TTL, if it doesn't
       // arrive the connection will get closed
+
       scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
 
       if (config.isBackup())
@@ -322,7 +307,7 @@
    public void connectionDestroyed(final Object connectionID)
    {
       RemotingConnection conn = connections.get(connectionID);
-
+      
       if (conn != null)
       {
          // if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
@@ -362,11 +347,11 @@
 
    // Public --------------------------------------------------------
 
-   public void cancelPingerForConnectionID(final Object connectionID)
+   public void stopPingingForConnectionID(final Object connectionID)
    {
-      Pinger pinger = pingRunnables.get(connectionID);
+      Pinger pinger = pingers.get(connectionID);
 
-      pinger.close();
+      pinger.stopPinging();
    }
 
    // Package protected ---------------------------------------------
@@ -375,7 +360,7 @@
 
    // Private -------------------------------------------------------
 
-   private void setupScheduledRunnables(final RemotingConnection conn,
+   private void setupPinger(final RemotingConnection conn,
                                         final long clientFailureCheckPeriod,
                                         final long connectionTTL)
    {
@@ -392,72 +377,47 @@
       long connectionTTLToUse = config.getConnectionTTLOverride() != -1 ? config.getConnectionTTLOverride()
                                                                        : connectionTTL;
 
-      if (connectionTTLToUse != -1)
-      {
-         FailedConnectionRunnable runnable = new FailedConnectionRunnable(conn);
-
-         Future<?> connectionTTLFuture = scheduledThreadPool.scheduleAtFixedRate(runnable,
-                                                                                 connectionTTLToUse,
-                                                                                 connectionTTLToUse,
-                                                                                 TimeUnit.MILLISECONDS);
-
-         runnable.setFuture(connectionTTLFuture);
-
-         connectionTTLRunnables.put(conn.getID(), runnable);
-      }
-
       long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
 
-      if (pingPeriod != -1)
-      {
-         Pinger pingRunnable = new Pinger(conn);
+      Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
 
-         Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable,
-                                                                        0,
-                                                                        pingPeriod,
-                                                                        TimeUnit.MILLISECONDS);
+      Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
 
-         pingRunnable.setFuture(pingFuture);
+      pingRunnable.setFuture(pingFuture);
 
-         pingRunnables.put(conn.getID(), pingRunnable);
-      }
+      pingers.put(conn.getID(), pingRunnable);
    }
 
    private RemotingConnection closeConnection(final Object connectionID)
    {
       RemotingConnection connection = connections.remove(connectionID);
+      
+      Pinger pinger = pingers.remove(connectionID);
 
-      FailedConnectionRunnable runnable = connectionTTLRunnables.remove(connectionID);
-
-      if (runnable != null)
+      if (pinger != null)
       {
-         runnable.close();
+         pinger.close();
       }
 
-      Pinger pingRunnable = pingRunnables.remove(connectionID);
-
-      if (pingRunnable != null)
-      {
-         pingRunnable.close();
-      }
-
       return connection;
    }
 
    // Inner classes -------------------------------------------------
 
-   private class Channel0Handler implements ChannelHandler
+   private class InitialPingTimeout implements Runnable, ChannelHandler
    {
       private final RemotingConnection conn;
 
-      private volatile boolean gotInitialPing;
+      private boolean gotInitialPing;
 
-      private Channel0Handler(final RemotingConnection conn)
+      private InitialPingTimeout(final RemotingConnection conn)
       {
          this.conn = conn;
+         
+         conn.getChannel(0, -1, false).setHandler(this);
       }
-
-      public void handlePacket(final Packet packet)
+      
+      public synchronized void handlePacket(final Packet packet)
       {
          final byte type = packet.getType();
 
@@ -467,7 +427,7 @@
             {
                Ping ping = (Ping)packet;
 
-               setupScheduledRunnables(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
+               setupPinger(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
 
                gotInitialPing = true;
             }
@@ -478,28 +438,9 @@
          }
       }
 
-      private boolean isGotInitialPing()
+      public synchronized void run()
       {
-         return gotInitialPing;
-      }
-   }
-
-   private class InitialPingTimeout implements Runnable
-   {
-      private final RemotingConnection conn;
-
-      private final Channel0Handler handler;
-
-      private InitialPingTimeout(final RemotingConnection conn, final Channel0Handler handler)
-      {
-         this.conn = conn;
-
-         this.handler = handler;
-      }
-
-      public void run()
-      {
-         if (!handler.isGotInitialPing())
+         if (!gotInitialPing)
          {
             // Never received initial ping
             log.warn("Did not receive initial ping for connection, it will be closed");
@@ -511,52 +452,24 @@
       }
    }
 
-   private class FailedConnectionRunnable implements Runnable
+   private class FailedConnectionAction implements Runnable
    {
-      private boolean closed;
-
       private RemotingConnection conn;
 
-      private Future<?> future;
-
-      FailedConnectionRunnable(final RemotingConnection conn)
+      FailedConnectionAction(final RemotingConnection conn)
       {
          this.conn = conn;
       }
 
-      public synchronized void setFuture(final Future<?> future)
-      {
-         this.future = future;
-      }
-
       public synchronized void run()
       {
-         if (closed)
-         {
-            return;
-         }
+         removeConnection(conn.getID());
 
-         if (!conn.isDataReceived())
-         {
-            removeConnection(conn.getID());
+         MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                        "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
 
-            MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                                           "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
-
-            conn.fail(me);
-         }
-         else
-         {
-            conn.clearDataReceived();
-         }
+         conn.fail(me);
       }
-
-      public synchronized void close()
-      {
-         future.cancel(false);
-
-         closed = true;
-      }
    }
 
    private class DelegatingBufferHandler extends AbstractBufferHandler

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -1160,7 +1160,7 @@
    {
       try
       {
-         log.warn("Client connection failed, clearing up resources for session " + name, new Exception());
+         log.warn("Client connection failed, clearing up resources for session " + name);
 
          for (Runnable runner : failureRunners)
          {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-06-15 10:36:18 UTC (rev 7332)
@@ -314,7 +314,7 @@
 
       serverConn.addFailureListener(serverListener);
 
-      ((RemotingServiceImpl)server.getRemotingService()).cancelPingerForConnectionID(serverConn.getID());
+      ((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
 
       for (int i = 0; i < 1000; i++)
       {




More information about the jboss-cvs-commits mailing list