[jboss-cvs] JBoss Messaging SVN: r7730 - in trunk: src/main/org/jboss/messaging/core/client/impl and 15 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 14 10:51:59 EDT 2009


Author: timfox
Date: 2009-08-14 10:51:58 -0400 (Fri, 14 Aug 2009)
New Revision: 7730

Added:
   trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
Modified:
   trunk/src/config/common/schema/jbm-configuration.xsd
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
multiple changes to pinging

Modified: trunk/src/config/common/schema/jbm-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/jbm-configuration.xsd	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/config/common/schema/jbm-configuration.xsd	2009-08-14 14:51:58 UTC (rev 7730)
@@ -47,6 +47,8 @@
             </xsd:element>
 				<xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl-override" type="xsd:long">
 				</xsd:element>
+				<xsd:element maxOccurs="1" minOccurs="0" name="async-connection-execution-enabled" type="xsd:boolean">
+				</xsd:element>				
 				<xsd:element maxOccurs="1" minOccurs="0" name="transaction-timeout" type="xsd:long">
 				</xsd:element>
 				<xsd:element maxOccurs="1" minOccurs="0" name="transaction-timeout-scan-period" type="xsd:long">

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -58,7 +58,7 @@
 
    public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
 
-   public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 5000;
+   public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
 
    // 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
    // or backup without fear of session having already been closed when connection having timed out.

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -51,7 +51,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -146,11 +145,13 @@
    private volatile boolean closed;
 
    private Set<FailureListener> listeners = new ConcurrentHashSet<FailureListener>();
-   
+
    private Connector connector;
 
-   private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
+   private Future<?> pingerFuture;
 
+   private PingRunnable pingRunnable;
+
    // debug
 
    private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
@@ -229,7 +230,7 @@
 
       this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
    }
-   
+
    // ConnectionLifeCycleListener implementation --------------------------------------------------
 
    public void connectionCreated(final Connection connection)
@@ -450,7 +451,7 @@
    }
 
    public int numSessions()
-   {   
+   {
       return sessions.size();
    }
 
@@ -473,7 +474,7 @@
 
       closed = true;
    }
-   
+
    public void addFailureListener(FailureListener listener)
    {
       listeners.add(listener);
@@ -484,17 +485,9 @@
       return listeners.remove(listener);
    }
 
-
    // Public
    // ---------------------------------------------------------------------------------------
 
-   public void cancelPingerForConnectionID(final Object connectionID)
-   {
-      Pinger pinger = pingers.get(connectionID);
-
-      pinger.close();
-   }
-
    @Override
    protected void finalize() throws Throwable
    {
@@ -504,6 +497,13 @@
       super.finalize();
    }
 
+   private volatile boolean stopPingingAfterOne;
+
+   public void stopPingingAfterOne()
+   {
+      this.stopPingingAfterOne = true;
+   }
+
    // Protected
    // ------------------------------------------------------------------------------------
 
@@ -610,7 +610,7 @@
                oldConnections.add(entry.connection);
             }
 
-            closePingers();
+            // closePingers();
 
             connections.clear();
 
@@ -661,8 +661,8 @@
                {
                   connection.destroy();
                }
-               
-               closeConnectionsAndCallFailureListeners(me);               
+
+               closeConnectionsAndCallFailureListeners(me);
             }
          }
          else
@@ -671,24 +671,24 @@
          }
       }
    }
-   
+
    private void closeConnectionsAndCallFailureListeners(final MessagingException me)
    {
       refCount = 0;
       mapIterator = null;
       checkCloseConnections();
-      
-      //TODO (after beta5) should really execute on different thread then remove the async in JBossConnection
-      
-//      threadPool.execute(new Runnable()
-//      {
-//         public void run()
-//         {
-            callFailureListeners(me);
-//         }
-//      });      
+
+      // TODO (after beta5) should really execute on different thread then remove the async in JBossConnection
+
+      // threadPool.execute(new Runnable()
+      // {
+      // public void run()
+      // {
+      callFailureListeners(me);
+      // }
+      // });
    }
-   
+
    private void callFailureListeners(final MessagingException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(listeners);
@@ -709,16 +709,6 @@
       }
    }
 
-   private void closePingers()
-   {
-      for (Pinger pinger : pingers.values())
-      {
-         pinger.close();
-      }
-
-      pingers.clear();
-   }
-
    /*
     * Re-attach sessions all pre-existing sessions to new remoting connections
     */
@@ -867,8 +857,17 @@
 
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
 
-         closePingers();
+         if (pingerFuture != null)
+         {
+            pingRunnable.cancel();
 
+            pingerFuture.cancel(false);
+
+            pingRunnable = null;
+
+            pingerFuture = null;
+         }
+
          connections.clear();
 
          for (ConnectionEntry entry : copy)
@@ -989,33 +988,21 @@
 
          conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
 
-         connections.put(conn.getID(), new ConnectionEntry(conn, connector));
+         conn.getChannel(0, -1, false).setHandler(new Channel0Handler(conn));
 
-         // Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
-         // the server needs this in order to do pinging and failure checking
+         connections.put(conn.getID(), new ConnectionEntry(conn,
+                                                           connector,
+                                                           clientFailureCheckPeriod,
+                                                           System.currentTimeMillis()));
 
-         Pinger pinger = new Pinger(conn,
-                                    clientFailureCheckPeriod,
-                                    new Channel0Handler(conn),
-                                    new FailedConnectionAction(conn),
-                                    0);
-
-         pingers.put(conn.getID(), pinger);
-
-         Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
-
-         Channel channel0 = conn.getChannel(0, -1, false);
-
-         channel0.send(ping);
-
-         if (clientFailureCheckPeriod != -1)
+         if (clientFailureCheckPeriod != -1 && pingerFuture == null)
          {
-            Future<?> pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(pinger,
-                                                                                clientFailureCheckPeriod,
-                                                                                clientFailureCheckPeriod,
-                                                                                TimeUnit.MILLISECONDS);
+            pingRunnable = new PingRunnable();
 
-            pinger.setFuture(pingerFuture);
+            pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(pingRunnable,
+                                                                      0,
+                                                                      clientFailureCheckPeriod,
+                                                                      TimeUnit.MILLISECONDS);
          }
 
          if (debug)
@@ -1144,51 +1131,33 @@
                                                    "The connection was closed by the server"));
                }
             });
-         }
-         else
-         {
-            throw new IllegalArgumentException("Invalid packet: " + packet);
-         }
+         }         
       }
    }
 
-   private class FailedConnectionAction implements Runnable
-   {
-      private RemotingConnection conn;
-
-      FailedConnectionAction(final RemotingConnection conn)
-      {
-         this.conn = conn;
-      }
-
-      public synchronized void run()
-      {
-         final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                                              "Did not receive ping from server for " + conn.getTransportConnection());
-
-         threadPool.execute(new Runnable()
-         {
-            // Must be executed on different thread
-            public void run()
-            {
-               conn.fail(me);
-            }
-         });
-      }
-   }
-
    private static class ConnectionEntry
    {
-      ConnectionEntry(final RemotingConnection connection, final Connector connector)
+      ConnectionEntry(final RemotingConnection connection,
+                      final Connector connector,
+                      final long expiryPeriod,
+                      final long createTime)
       {
          this.connection = connection;
 
          this.connector = connector;
+
+         this.expiryPeriod = expiryPeriod;
+
+         this.lastCheck = createTime;
       }
 
       final RemotingConnection connection;
 
       final Connector connector;
+
+      volatile long lastCheck;
+
+      final long expiryPeriod;
    }
 
    private class DelegatingBufferHandler extends AbstractBufferHandler
@@ -1263,4 +1232,68 @@
       }
    }
 
+   private class PingRunnable implements Runnable
+   {
+      private boolean cancelled;
+
+      private boolean first;
+
+      public synchronized void run()
+      {
+         if (cancelled || (stopPingingAfterOne && !first))
+         {
+            return;
+         }
+         
+         first = false;
+
+         synchronized (connections)
+         {
+            long now = System.currentTimeMillis();
+
+            for (ConnectionEntry entry : connections.values())
+            {
+               final RemotingConnection connection = entry.connection;
+               
+               if (entry.expiryPeriod != -1 && now >= entry.lastCheck + entry.expiryPeriod)
+               {
+                  if (!connection.checkDataReceived())
+                  {
+                     final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                                          "Did not receive data from server for " + connection.getTransportConnection());
+
+                     threadPool.execute(new Runnable()
+                     {
+                        // Must be executed on different thread
+                        public void run()
+                        {
+                           connection.fail(me);
+                        }
+                     });
+                     
+                     return;
+                  }
+                  else
+                  {
+                     entry.lastCheck = now;
+                  }
+               }
+               
+               // Send a ping
+
+               Ping ping = new Ping(connectionTTL);
+
+               Channel channel0 = connection.getChannel(0, -1, false);
+
+               channel0.send(ping);
+            }
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -103,6 +103,10 @@
    long getConnectionTTLOverride();
 
    void setConnectionTTLOverride(long ttl);
+   
+   boolean isAsyncConnectionExecutionEnabled();
+   
+   void setEnabledAsyncConnectionExecution(boolean enabled);
 
    Set<TransportConfiguration> getAcceptorConfigurations();
 

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -66,6 +66,8 @@
    public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
 
    public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
+   
+   public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
 
    public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
 
@@ -182,7 +184,9 @@
    protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
 
    protected long connectionTTLOverride = DEFAULT_CONNECTION_TTL_OVERRIDE;
-
+   
+   protected boolean asyncConnectionExecutionEnabled = DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED;
+   
    protected long messageExpiryScanPeriod = DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
 
    protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
@@ -410,6 +414,16 @@
    {
       connectionTTLOverride = ttl;
    }
+   
+   public boolean isAsyncConnectionExecutionEnabled()
+   {
+      return asyncConnectionExecutionEnabled;
+   }
+   
+   public void setEnabledAsyncConnectionExecution(final boolean enabled)
+   {
+      asyncConnectionExecutionEnabled = enabled;
+   }
 
    public List<String> getInterceptorClassNames()
    {

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -135,6 +135,8 @@
       securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval, GT_ZERO);
 
       connectionTTLOverride = getLong(e, "connection-ttl-override", connectionTTLOverride, MINUS_ONE_OR_GT_ZERO);
+      
+      asyncConnectionExecutionEnabled = getBoolean(e, "async-connection-execution-enabled", asyncConnectionExecutionEnabled);
 
       transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout, GT_ZERO);
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -390,7 +390,7 @@
 
                if (info == null)
                {
-                  throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+                  return;
                }
 
                info.decrementConsumers();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -75,5 +75,7 @@
    
    long getBlockingCallTimeout();
    
-   Object getTransferLock();     
+   Object getTransferLock(); 
+   
+   boolean checkDataReceived();
 }

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -1,143 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.remoting.impl;
-
-import java.util.concurrent.Future;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-
-/**
- * A Pinger
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class Pinger implements Runnable, ChannelHandler
-{   
-   private static final Logger log = Logger.getLogger(Pinger.class);
-   
-   private volatile boolean closed;
-
-   private Future<?> future;
-   
-   private long lastPingReceived;
-   
-   private final long expiryPeriod;
-   
-   private final ChannelHandler extraHandler;
-   
-   private final Runnable connectionFailedAction;
-   
-   private final Channel channel0;
-   
-   private boolean first = true;
-   
-   private boolean stopPinging;   
-   
-   public Pinger(final RemotingConnection conn, final long expiryPeriod, final ChannelHandler extraHandler,
-                 final Runnable connectionFailedAction, final long lastPingReceived)
-   {
-      this.expiryPeriod = expiryPeriod;
-      
-      this.extraHandler = extraHandler;
-      
-      this.connectionFailedAction = connectionFailedAction;
-      
-      this.channel0 = conn.getChannel(0, -1, false); 
-      
-      this.lastPingReceived = lastPingReceived;
-      
-      channel0.setHandler(this);
-   }
-              
-   public synchronized void setFuture(final Future<?> future)
-   {
-      this.future = future;
-   }
-   
-   public synchronized void handlePacket(final Packet packet)
-   {
-      if (closed)
-      {
-         return;
-      }
-      
-      if (packet.getType() == PacketImpl.PING)
-      {
-         lastPingReceived = System.currentTimeMillis();
-      }
-      else if (extraHandler != null)
-      {
-         extraHandler.handlePacket(packet);
-      }
-      else
-      {
-         throw new IllegalStateException("Invalid packet " + packet.getType());
-      }
-   }
-   
-   public synchronized void run()
-   {
-      if (closed)
-      {
-         return;
-      }
-      
-      if (!first && ( System.currentTimeMillis() - lastPingReceived > expiryPeriod))
-      {
-         connectionFailedAction.run();
-      }
-      else if (!stopPinging)
-      {      
-         channel0.send(new Ping());
-      }
-      
-      first = false;
-   }
-     
-   public void close()
-   {
-      if (future != null)
-      {                      
-         future.cancel(false);
-      }
-      
-      channel0.setHandler(null);
-
-      closed = true;
-   }
-   
-   public synchronized void stopPinging()
-   {
-      this.stopPinging = true;
-   }
-      
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -17,6 +17,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -26,6 +27,7 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.SimpleIDGenerator;
@@ -79,9 +81,13 @@
    private boolean frozen;
 
    private final Object failLock = new Object();
-   
+
    private final PacketDecoder decoder = new PacketDecoder();
-   
+
+   private volatile boolean dataReceived;
+
+   private final Executor executor;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -92,7 +98,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, blockingCallTimeout, interceptors, true, true);
+      this(transportConnection, blockingCallTimeout, interceptors, true, true, null);
    }
 
    /*
@@ -100,17 +106,19 @@
     */
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final List<Interceptor> interceptors,
-                                 final boolean active)
+                                 final boolean active,
+                                 final Executor executor)
 
    {
-      this(transportConnection, -1, interceptors, active, false);           
+      this(transportConnection, -1, interceptors, active, false, executor);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean active,
-                                  final boolean client)
+                                  final boolean client,
+                                  final Executor executor)
 
    {
       this.transportConnection = transportConnection;
@@ -122,6 +130,8 @@
       this.active = active;
 
       this.client = client;
+
+      this.executor = executor;
    }
 
    // RemotingConnection implementation
@@ -294,27 +304,36 @@
    {
       return transferLock;
    }
-   
+
    public boolean isActive()
    {
       return active;
    }
-   
+
    public boolean isClient()
    {
       return client;
    }
-   
+
    public boolean isDestroyed()
    {
       return destroyed;
    }
-   
+
    public long getBlockingCallTimeout()
    {
       return blockingCallTimeout;
    }
 
+   public boolean checkDataReceived()
+   {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
    // Buffer Handler implementation
    // ----------------------------------------------------
 
@@ -322,6 +341,28 @@
    {
       final Packet packet = decoder.decode(buffer);
 
+      if (executor == null || packet.getType() == PacketImpl.PING)
+      {
+         // Pings must always be handled out of band so we can send pings back to the client quickly
+         // otherwise they would get in the queue with everything else which might give a intolerable delay
+         doBufferReceived(packet);
+      }
+      else
+      {
+         executor.execute(new Runnable()
+         {
+            public void run()
+            {
+               doBufferReceived(packet);
+            }
+         });
+      }
+
+      dataReceived = true;
+   }
+
+   private void doBufferReceived(final Packet packet)
+   {
       synchronized (transferLock)
       {
          if (!frozen)
@@ -432,5 +473,5 @@
          channel.close();
       }
    }
-   
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -40,19 +40,15 @@
 
    private long connectionTTL;
 
-   private long clientFailureCheckPeriod;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public Ping(final long clientFailureCheckPeriod, final long connectionTTL)
+   public Ping(final long connectionTTL)
    {
       super(PING);
 
       this.connectionTTL = connectionTTL;
-
-      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
    }
 
    public Ping()
@@ -66,32 +62,25 @@
    {
       return true;
    }
-   
+
    public long getConnectionTTL()
    {
       return connectionTTL;
    }
 
-   public long getClientFailureCheckPeriod()
-   {
-      return clientFailureCheckPeriod;
-   }
-
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 2 * DataConstants.SIZE_LONG;
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
    }
 
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeLong(connectionTTL);
-      buffer.writeLong(clientFailureCheckPeriod);
    }
 
    public void decodeBody(final MessagingBuffer buffer)
    {
       connectionTTL = buffer.readLong();
-      clientFailureCheckPeriod = buffer.readLong();
    }
 
    @Override
@@ -99,7 +88,6 @@
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", connectionTTL=" + connectionTTL);
-      buf.append(", clientFailureCheckPeriod=" + clientFailureCheckPeriod);
       buf.append("]");
       return buf.toString();
    }
@@ -113,8 +101,7 @@
 
       Ping r = (Ping)other;
 
-      return super.equals(other) && this.connectionTTL == r.connectionTTL &&
-             this.clientFailureCheckPeriod == r.clientFailureCheckPeriod;
+      return super.equals(other) && this.connectionTTL == r.connectionTTL;
    }
 
    public final boolean isRequiresConfirmations()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -36,8 +36,6 @@
  */
 public interface RemotingService extends MessagingComponent
 {
-   RemotingConnection getConnection(Object remotingConnectionID);
-
    /**
     * Remove a connection from the connections held by the remoting service.
     * <strong>This method must be used only from the management API.

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -13,7 +13,6 @@
 package org.jboss.messaging.core.remoting.server.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -23,9 +22,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -38,7 +35,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -66,7 +62,7 @@
 
    private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
 
-   private static final long INITIAL_PING_TIMEOUT = 10000;
+   private static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
 
    // Attributes ----------------------------------------------------
 
@@ -78,7 +74,7 @@
 
    private final Set<Acceptor> acceptors = new HashSet<Acceptor>();
 
-   private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+   private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
 
    private final BufferHandler bufferHandler = new DelegatingBufferHandler();
 
@@ -94,10 +90,10 @@
 
    private final ScheduledExecutorService scheduledThreadPool;
 
-   private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
-
    private final int managementConnectorID;
 
+   private FailureCheckThread failureCheckThread;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -152,7 +148,11 @@
 
             AcceptorFactory factory = (AcceptorFactory)clazz.newInstance();
 
-            Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this, threadPool, scheduledThreadPool);
+            Acceptor acceptor = factory.createAcceptor(info.getParams(),
+                                                       bufferHandler,
+                                                       this,
+                                                       threadPool,
+                                                       scheduledThreadPool);
 
             acceptors.add(acceptor);
 
@@ -198,6 +198,10 @@
          a.start();
       }
 
+      failureCheckThread = new FailureCheckThread(CONNECTION_TTL_CHECK_INTERVAL);
+
+      failureCheckThread.start();
+
       started = true;
    }
 
@@ -224,32 +228,22 @@
       {
          return;
       }
-      
+
+      failureCheckThread.close();
+
       // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
       {
          acceptor.stop();
       }
 
-      for (RemotingConnection connection : connections.values())
+      for (ConnectionEntry entry : connections.values())
       {
-         connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
+         entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
       }
 
-//      for (Acceptor acceptor : acceptors)
-//      {
-//         acceptor.stop();
-//      }
-
       acceptors.clear();
 
-      for (Pinger pinger : pingers.values())
-      {
-         pinger.close();
-      }
-      
-      pingers.clear();
-
       connections.clear();
 
       started = false;
@@ -260,19 +254,30 @@
       return started;
    }
 
-   public RemotingConnection getConnection(final Object remotingConnectionID)
+   public RemotingConnection removeConnection(final Object remotingConnectionID)
    {
-      return connections.get(remotingConnectionID);
-   }
+      ConnectionEntry entry = this.connections.remove(remotingConnectionID);
 
-   public RemotingConnection removeConnection(final Object remotingConnectionID)
-   {
-      return closeConnection(remotingConnectionID);
+      if (entry != null)
+      {
+         return entry.connection;
+      }
+      else
+      {
+         return null;
+      }
    }
 
    public synchronized Set<RemotingConnection> getConnections()
    {
-      return new HashSet<RemotingConnection>(connections.values());
+      Set<RemotingConnection> conns = new HashSet<RemotingConnection>();
+
+      for (ConnectionEntry entry : this.connections.values())
+      {
+         conns.add(entry.connection);
+      }
+
+      return conns;
    }
 
    public RemotingConnection getServerSideReplicatingConnection()
@@ -288,25 +293,48 @@
       {
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
+      
+      RemotingConnection rc = new RemotingConnectionImpl(connection,
+                                                         interceptors,
+                                                         !config.isBackup(),
+                                                         server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
+                                                                                                                               .getExecutor()
+                                                                                                                      : null);
 
-      RemotingConnection rc = new RemotingConnectionImpl(connection, interceptors, !config.isBackup());
-
       Channel channel1 = rc.getChannel(1, -1, false);
 
       ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
 
       channel1.setHandler(handler);
 
-      connections.put(connection.getID(), rc);
+      final ConnectionEntry entry = new ConnectionEntry(rc,
+                                                        System.currentTimeMillis(),
+                                                        config.getConnectionTTLOverride());
 
-      InitialPingTimeout runnable = new InitialPingTimeout(rc);
+      connections.put(connection.getID(), entry);
 
-      // We schedule an initial ping timeout. An inital ping is always sent from the client as the first thing it
-      // does after creating a connection, this contains the ping period and connection TTL, if it doesn't
-      // arrive the connection will get closed
+      final Channel channel0 = rc.getChannel(0, -1, false);
 
-      scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
-      
+      channel0.setHandler(new ChannelHandler()
+      {
+         public void handlePacket(final Packet packet)
+         {
+            if (packet.getType() == PacketImpl.PING)
+            {
+               Ping ping = (Ping)packet;
+
+               if (config.getConnectionTTLOverride() == -1)
+               {
+                  // Allow clients to specify connection ttl
+                  entry.ttl = ping.getConnectionTTL();
+               }
+
+               // Just send a ping back
+               channel0.send(packet);
+            }
+         }
+      });
+
       if (config.isBackup())
       {
          serverSideReplicatingConnection = rc;
@@ -315,19 +343,19 @@
 
    public void connectionDestroyed(final Object connectionID)
    {
-      RemotingConnection conn = connections.get(connectionID);
-      
+      ConnectionEntry conn = connections.get(connectionID);
+
       if (conn != null)
       {
          // if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
          // shutdown, therefore we can destroy the connection
          // otherwise client might have crashed/exited without closing connections so we leave them for connection TTL
 
-         if (conn.getFailureListeners().isEmpty())
+         if (conn.connection.getFailureListeners().isEmpty())
          {
-            closeConnection(connectionID);
+            connections.remove(connectionID);
 
-            conn.destroy();
+            conn.connection.destroy();
          }
       }
    }
@@ -356,142 +384,137 @@
 
    // Public --------------------------------------------------------
 
-   public void stopPingingForConnectionID(final Object connectionID)
-   {
-      Pinger pinger = pingers.get(connectionID);
-
-      pinger.stopPinging();
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
-   private void setupPinger(final RemotingConnection conn,
-                                        final long clientFailureCheckPeriod,
-                                        final long connectionTTL)
+   // Inner classes -------------------------------------------------
+
+   private final class DelegatingBufferHandler extends AbstractBufferHandler
    {
-      if ((connectionTTL <= 0 || clientFailureCheckPeriod <= 0) && connectionTTL != -1 &&
-          clientFailureCheckPeriod != -1)
+      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
       {
-         log.warn("Invalid values of connectionTTL/clientFailureCheckPeriod");
+         ConnectionEntry conn = connections.get(connectionID);
 
-         closeConnection(conn.getID());
-
-         return;
+         if (conn != null)
+         {
+            conn.connection.bufferReceived(connectionID, buffer);
+         }
       }
+   }
 
-      long connectionTTLToUse = config.getConnectionTTLOverride() != -1 ? config.getConnectionTTLOverride()
-                                                                       : connectionTTL;
+   private static final class ConnectionEntry
+   {
+      final RemotingConnection connection;
 
-      long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
+      volatile long lastCheck;
 
-      Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
+      volatile long ttl;
 
-      Future<?> pingFuture = scheduledThreadPool.scheduleWithFixedDelay(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
+      ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+      {
+         this.connection = connection;
 
-      pingRunnable.setFuture(pingFuture);
+         this.lastCheck = lastCheck;
 
-      pingers.put(conn.getID(), pingRunnable);           
-   }
-
-   private RemotingConnection closeConnection(final Object connectionID)
-   {
-      RemotingConnection connection = connections.remove(connectionID);
-      
-      Pinger pinger = pingers.remove(connectionID);
-      
-      if (pinger != null)
-      {
-         pinger.close();
+         this.ttl = ttl;
       }
-      
-      return connection;
    }
 
-   // Inner classes -------------------------------------------------
-
-   private class InitialPingTimeout implements Runnable, ChannelHandler
+   private final class FailureCheckThread extends Thread
    {
-      private final RemotingConnection conn;
+      private long pauseInterval;
 
-      private boolean gotInitialPing;
+      private volatile boolean closed;
 
-      private InitialPingTimeout(final RemotingConnection conn)
+      FailureCheckThread(final long pauseInterval)
       {
-         this.conn = conn;
-         
-         conn.getChannel(0, -1, false).setHandler(this);
+         this.pauseInterval = pauseInterval;
       }
-      
-      public synchronized void handlePacket(final Packet packet)
+
+      public synchronized void close()
       {
-         final byte type = packet.getType();
+         closed = true;
 
-         if (type == PING)
+         synchronized (this)
          {
-            if (!gotInitialPing)
-            {
-               Ping ping = (Ping)packet;
+            notify();
+         }
 
-               setupPinger(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
-
-               gotInitialPing = true;
-            }
+         try
+         {
+            join();
          }
-         else
+         catch (InterruptedException ignore)
          {
-            throw new IllegalArgumentException("Invalid packet: " + packet);
          }
       }
 
-      public synchronized void run()
+      public void run()
       {
-         if (!gotInitialPing)
-         {            
-            // Never received initial ping
-            log.warn("Did not receive initial ping from " + conn.getRemoteAddress() + ", connection will be closed");
+         while (!closed)
+         {
+            long now = System.currentTimeMillis();
 
-            closeConnection(conn);
+            Set<Object> idsToRemove = new HashSet<Object>();
 
-            conn.destroy();
-         }
-      }
-   }
+            for (ConnectionEntry entry : connections.values())
+            {
+               if (entry.ttl != -1)
+               {
+                  if (now >= entry.lastCheck + entry.ttl)
+                  {
+                     RemotingConnection conn = entry.connection;
 
-   private class FailedConnectionAction implements Runnable
-   {
-      private RemotingConnection conn;
+                     if (!conn.checkDataReceived())
+                     {
+                        idsToRemove.add(conn.getID());
+                     }
+                     else
+                     {
+                        entry.lastCheck = now;
+                     }
+                  }
+               }
+            }
 
-      FailedConnectionAction(final RemotingConnection conn)
-      {
-         this.conn = conn;
-      }
+            for (Object id : idsToRemove)
+            {
+               RemotingConnection conn = removeConnection(id);
 
-      public synchronized void run()
-      {
-         removeConnection(conn.getID());
+               MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                              "Did not receive ping from " + conn.getRemoteAddress() +
+                                                                       ". It is likely the client has exited or crashed without " +
+                                                                       "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+               conn.fail(me);
+            }
 
-         MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                                        "Did not receive ping from " + conn.getRemoteAddress() + ". It is likely the client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+            synchronized (this)
+            {
+               long toWait = pauseInterval;
 
-         conn.fail(me);
-      }
-   }
+               long start = System.currentTimeMillis();
 
-   private class DelegatingBufferHandler extends AbstractBufferHandler
-   {
-      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
-      {
-         RemotingConnection conn = connections.get(connectionID);
+               while (!closed && toWait > 0)
+               {
+                  try
+                  {
+                     wait(toWait);
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
 
-         if (conn != null)
-         {
-            conn.bufferReceived(connectionID, buffer);
+                  now = System.currentTimeMillis();
+
+                  toWait -= now - start;
+
+                  start = now;
+               }
+            }
          }
       }
    }
-
 }
\ No newline at end of file

Added: trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+
+/**
+ * A MemoryManager
+
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2796 $</tt>
+ *
+ * $Id: MemoryManager.java 2796 2007-06-25 22:24:41Z timfox $
+ *
+ */
+public interface MemoryManager extends MessagingComponent
+{
+   boolean isMemoryLow();    
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -18,7 +18,6 @@
 import javax.management.MBeanServer;
 
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.deployers.DeploymentManager;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.impl.MessagingServerControlImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -35,6 +34,7 @@
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
 
@@ -63,11 +63,11 @@
    Version getVersion();
 
    MessagingServerControlImpl getMessagingServerControl();
-   
+
    void registerActivateCallback(ActivateCallback callback);
-   
+
    void unregisterActivateCallback(ActivateCallback callback);
-    
+
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,
@@ -141,9 +141,10 @@
                      SimpleString filterString,
                      boolean durable,
                      boolean temporary) throws Exception;
-   
+
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 
    void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
 
+   ExecutorFactory getExecutorFactory();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -565,13 +565,6 @@
          this.bridge = bridge;
       }
 
-      // public synchronized void reset() throws Exception
-      // {
-      // clearBindings();
-      //
-      // firstReset = false;
-      // }
-
       public synchronized void onMessage(final ClientMessage message)
       {
          try

Added: trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MemoryManager;
+
+/**
+ * A MemoryManager
+
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ *
+ */
+public class MemoryManagerImpl implements MemoryManager
+{
+   private static final Logger log = Logger.getLogger(MemoryManagerImpl.class);
+   
+   private static final long DEFAULT_MEASURE_INTERVAL = 3000;
+   
+   private static final int DEFAULT_FREE_MEMORY_PERCENT = 25;
+       
+   private Runtime runtime;
+   
+   //TODO Should be configurable
+   private long measureInterval;
+   
+   //TODO Should be configurable
+   private int freeMemoryPercent;
+   
+   private volatile boolean started;
+   
+   private Thread thread;
+   
+   private volatile boolean low;
+   
+   public MemoryManagerImpl()
+   {
+      runtime = Runtime.getRuntime();
+      
+      this.measureInterval = DEFAULT_MEASURE_INTERVAL;
+      
+      this.freeMemoryPercent = DEFAULT_FREE_MEMORY_PERCENT;    
+   }
+   
+   public boolean isMemoryLow()
+   {
+      return low;
+   }
+    
+   public synchronized boolean isStarted()
+   {
+      return started;
+   }
+   
+   public synchronized void start()
+   {
+      log.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval
+               + " FREE_MEMORY_PERCENT: " + freeMemoryPercent);
+      
+      if (started)
+      {
+         //Already started
+         return;
+      }
+      
+      started = true;
+      
+      thread = new Thread(new MemoryRunnable());
+      
+      thread.setDaemon(true);
+      
+      thread.start();
+   }
+   
+   public synchronized void stop()
+   {      
+      if (!started)
+      {
+         //Already stopped
+         return;
+      }
+      
+      started = false;
+      
+      thread.interrupt();
+      
+      try
+      {
+         thread.join();
+      }
+      catch (InterruptedException ignore)
+      {         
+      }
+   }
+   
+   private class MemoryRunnable implements Runnable
+   {
+      public void run()
+      {
+         while (true)
+         {
+            try
+            {
+               if (thread.isInterrupted() && !started)
+               {
+                  break;
+               }
+               
+               Thread.sleep(measureInterval);
+            }
+            catch (InterruptedException ignore)
+            {
+               if (!started)
+               {
+                  break;
+               }
+            }
+                                  
+            long maxMemory = runtime.maxMemory();
+            
+            long totalMemory = runtime.totalMemory();
+            
+            long availableMemory = maxMemory - totalMemory;
+            
+            double currentFreeMemoryPercent = 100 * (double)availableMemory / maxMemory;
+            
+            log.info("Free memory " + currentFreeMemoryPercent + "% " +
+                     " Calculated available memory " + availableMemory + 
+                     " Total mem " + totalMemory);
+            
+            if (currentFreeMemoryPercent <= freeMemoryPercent)
+            {
+               log.warn("Less than " + freeMemoryPercent + "% (" + currentFreeMemoryPercent + "% total: " + totalMemory +
+                        " available " + availableMemory
+                        + ") of total available memory free. " + 
+                        "You are in danger of running out of RAM. Have you set paging parameters " +
+                        "on your addresses? (See user manual \"Paging\" chapter)");
+               
+               low = true;
+            }
+            else
+            {
+               low = false;
+            }
+             
+         }
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -82,6 +82,7 @@
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.ActivateCallback;
 import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.MemoryManager;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
@@ -173,9 +174,11 @@
    private RemotingService remotingService;
 
    private ManagementService managementService;
+   
+   private MemoryManager memoryManager;
 
    private DeploymentManager deploymentManager;
-
+      
    private Deployer basicUserCredentialsDeployer;
 
    private Deployer addressSettingsDeployer;
@@ -382,6 +385,8 @@
       threadPool = null;
 
       pagingManager.stop();
+      
+      memoryManager.stop();
 
       pagingManager = null;
       securityStore = null;
@@ -392,6 +397,7 @@
       queueFactory = null;
       resourceManager = null;
       messagingServerControl = null;
+      memoryManager = null;
 
       sessions.clear();
 
@@ -747,6 +753,11 @@
    {
       activateCallbacks.remove(callback);
    }
+   
+   public ExecutorFactory getExecutorFactory()
+   {
+      return executorFactory;
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------
@@ -874,6 +885,10 @@
       managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
       
       remotingService = new RemotingServiceImpl(configuration, this, managementService, threadPool, scheduledPool, managementConnectorID);      
+      
+      memoryManager = new MemoryManagerImpl();
+      
+      memoryManager.start();
    }
    
    private void initialisePart2() throws Exception
@@ -1349,11 +1364,11 @@
    {
       if (version.getIncrementingVersion() != incrementingVersion)
       {
-         throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
-                                      "Client with version " + incrementingVersion + " is not compatible with server version " +
+         log.warn("Client with version " + incrementingVersion + " is not compatible with server version " +
                                       version.getFullVersion()  + ". " +
                                       "Please ensure all clients and servers are upgraded to the same version for them to " +
                                       "interoperate");
+         return null;
       }
 
       // Is this comment relevant any more ?

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -212,6 +212,11 @@
                                          request.isPreAcknowledge(),
                                          request.isXA(),
                                          request.getWindowSize());
+         
+         if (response == null)
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS));
+         }
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -125,7 +125,7 @@
    
    public void write(final MessagingBuffer buffer, final boolean flush)
    {
-      ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());
+      ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());      
       
       if (flush)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -40,7 +40,6 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.jms.client.JBossBytesMessage;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -107,7 +107,7 @@
 
       final RemotingConnectionImpl conn1 = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
 
-      ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).cancelPingerForConnectionID(conn1.getID());
+      ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).stopPingingAfterOne();
 
       for (int i = 0; i < 1000; i++)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -24,14 +24,12 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -49,7 +47,6 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.Topic;
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -157,7 +154,7 @@
       serverManager.createConnectionFactory("ManualReconnectionToSingleServerTest",
                                             connectorConfigs,
                                             null,
-                                            DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                            1000,
                                             DEFAULT_CONNECTION_TTL,
                                             callTimeout,
                                             DEFAULT_MAX_CONNECTIONS,
@@ -185,116 +182,100 @@
                                             jndiBindings);
    }
 
-   
-   public void testFoo()
-   {      
-   }
-   
-   public void _testFlood() throws Exception
-   {      
-      Connection connection = null;
       
-      try
+    public void testFoo()
+    {
+    }
+
+   public void _testFlood() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
+
+      final int numProducers = 20;
+
+      final int numConsumers = 20;
+
+      final int numMessages = 10000;
+
+      ProducerThread[] producers = new ProducerThread[numProducers];
+
+      for (int i = 0; i < numProducers; i++)
       {
-         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
-         
-         connection = cf.createConnection();
-         
-         final int numProducers = 20;
-         
-         final int numConsumers = 20;
-         
-         final int numMessages = 100000;
-         
-         ProducerThread[] producers = new ProducerThread[numProducers];
-         
-         for (int i = 0; i < numProducers; i++)
-         {
-            producers[i] = new ProducerThread(cf, numMessages);
-         }
-         
-         ConsumerThread[] consumers = new ConsumerThread[numConsumers];
-         
-         for (int i = 0; i < numConsumers; i++)
-         {
-            consumers[i] = new ConsumerThread(cf, numMessages);
-         }
-         
-         
-         for (int i = 0; i < numConsumers; i++)
-         {
-            consumers[i].start();
-         }
-         
-         for (int i = 0; i < numProducers; i++)
-         {
-            producers[i].start();
-         }
-         
-         for (int i = 0; i < numConsumers; i++)
-         {
-            consumers[i].join();
-         }
-         
-         for (int i = 0; i < numProducers; i++)
-         {
-            producers[i].join();
-         }
-         
+         producers[i] = new ProducerThread(cf, numMessages);
       }
-      finally
+
+      ConsumerThread[] consumers = new ConsumerThread[numConsumers];
+
+      for (int i = 0; i < numConsumers; i++)
       {
-         if (connection != null)
-         {
-            connection.close();
-         }
+         consumers[i] = new ConsumerThread(cf, numMessages);
       }
-    
+
+      for (int i = 0; i < numConsumers; i++)
+      {
+         consumers[i].start();
+      }
+
+      for (int i = 0; i < numProducers; i++)
+      {
+         producers[i].start();
+      }
+
+      for (int i = 0; i < numConsumers; i++)
+      {
+         consumers[i].join();
+      }
+
+      for (int i = 0; i < numProducers; i++)
+      {
+         producers[i].join();
+      }
+
    }
-   
+
    class ProducerThread extends Thread
    {
       private Connection connection;
-      
+
       private Session session;
-      
+
       private MessageProducer producer;
-      
+
       private int numMessages;
-                 
+
       ProducerThread(ConnectionFactory cf, int numMessages) throws Exception
       {
          connection = cf.createConnection();
-         
+
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
+
          producer = session.createProducer(new JBossTopic("my-topic"));
-         
+
          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         
+
          this.numMessages = numMessages;
       }
-      
+
       public void run()
       {
          try
          {
             byte[] bytes = new byte[1000];
-            
+
             BytesMessage message = session.createBytesMessage();
-            
+
             message.writeBytes(bytes);
-            
+
             for (int i = 0; i < numMessages; i++)
             {
                producer.send(message);
-               
-               if (i % 1000 == 0)
-               {
-                  log.info("Producer " + this + " sent " + i);
-               }
+
+//               if (i % 1000 == 0)
+//               {
+//                  log.info("Producer " + this + " sent " + i);
+//               }
             }
-            
+
             connection.close();
          }
          catch (Exception e)
@@ -303,50 +284,50 @@
          }
       }
    }
-   
+
    class ConsumerThread extends Thread
    {
       private Connection connection;
-      
+
       private Session session;
-      
+
       private MessageConsumer consumer;
-      
+
       private int numMessages;
-                 
+
       ConsumerThread(ConnectionFactory cf, int numMessages) throws Exception
       {
          connection = cf.createConnection();
-         
+
          connection.start();
-         
+
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
+
          consumer = session.createConsumer(new JBossTopic("my-topic"));
-         
+
          this.numMessages = numMessages;
       }
-      
+
       public void run()
       {
          try
-         {                        
+         {
             for (int i = 0; i < numMessages; i++)
             {
                Message msg = consumer.receive();
-               
+
                if (msg == null)
                {
                   log.error("message is null");
                   break;
                }
-               
-               if (i % 1000 == 0)
-               {
-                  log.info("Consumer " + this + " received " + i);
-               }
+
+//               if (i % 1000 == 0)
+//               {
+//                  log.info("Consumer " + this + " received " + i);
+//               }
             }
-            
+
             connection.close();
          }
          catch (Exception e)
@@ -355,5 +336,5 @@
          }
       }
    }
-   
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-08-14 14:51:58 UTC (rev 7730)
@@ -28,7 +28,6 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionInternal;
 import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -36,8 +35,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 
@@ -111,6 +108,8 @@
       csf.setConnectionTTL(CLIENT_FAILURE_CHECK_PERIOD * 2);
 
       ClientSession session = csf.createSession(false, true, true);
+      
+      log.info("Created session");
 
       assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
 
@@ -231,12 +230,10 @@
 
       session.addFailureListener(clientListener);
 
-      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+      // We need to get it to stop pinging after one
 
-      // We need to get it to stop pinging
+      ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).stopPingingAfterOne();
 
-      ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).cancelPingerForConnectionID(conn.getID());
-
       RemotingConnection serverConn = null;
 
       while (serverConn == null)
@@ -330,7 +327,10 @@
 
       serverConn.addFailureListener(serverListener);
 
-      ((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
+      //((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
+      
+      //Setting the handler to null will prevent server sending pings back to client
+      serverConn.getChannel(0, -1, false).setHandler(null);
 
       for (int i = 0; i < 1000; i++)
       {
@@ -344,6 +344,7 @@
       }
             
       assertNotNull(clientListener.getException());
+      
       //Server connection will be closed too, when client closes client side connection after failure is detected
       assertTrue(server.getRemotingService().getConnections().isEmpty());
 




More information about the jboss-cvs-commits mailing list