[jboss-cvs] JBoss Messaging SVN: r4417 - in trunk: src/config and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 10 07:49:22 EDT 2008


Author: timfox
Date: 2008-06-10 07:49:22 -0400 (Tue, 10 Jun 2008)
New Revision: 4417

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
Modified:
   trunk/docs/userguide/en/modules/configuration.xml
   trunk/src/config/jbm-beans.xml
   trunk/src/config/jbm-configuration.xml
   trunk/src/config/jbm-standalone-beans.xml
   trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
   trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
Log:
Some changes to pooling and queue locking


Modified: trunk/docs/userguide/en/modules/configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/configuration.xml	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/docs/userguide/en/modules/configuration.xml	2008-06-10 11:49:22 UTC (rev 4417)
@@ -867,7 +867,7 @@
             <programlisting>
                <![CDATA[
    <bean name="RemotingService"
-      class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+      class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
       <constructor>
          <parameter>
             <inject bean="Configuration"/>

Modified: trunk/src/config/jbm-beans.xml
===================================================================
--- trunk/src/config/jbm-beans.xml	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-beans.xml	2008-06-10 11:49:22 UTC (rev 4417)
@@ -51,7 +51,7 @@
       </constructor>
    </bean>
       
-   <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+   <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
       <constructor>
          <parameter>
             <inject bean="Configuration"/>

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-configuration.xml	2008-06-10 11:49:22 UTC (rev 4417)
@@ -3,8 +3,12 @@
 
       <clustered>false</clustered>
 
-      <scheduled-executor-max-pool-size>30</scheduled-executor-max-pool-size>
+      <!-- Maximum number of threads to use for scheduled deliveries -->
+      <scheduled-max-pool-size>30</scheduled-max-pool-size>
 
+      <!-- Maximum number of threads to use for sessions on the server side -->      
+      <max-pool-size>30</max-pool-size>
+
       <require-destinations>true</require-destinations>
 
       <!-- Remoting configuration -->

Modified: trunk/src/config/jbm-standalone-beans.xml
===================================================================
--- trunk/src/config/jbm-standalone-beans.xml	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/config/jbm-standalone-beans.xml	2008-06-10 11:49:22 UTC (rev 4417)
@@ -64,7 +64,7 @@
       </constructor>
    </bean>
 
-   <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.MinaService">
+   <bean name="RemotingService" class="org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl">
       <constructor>
          <parameter>
             <inject bean="Configuration"/>

Modified: trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -42,19 +42,7 @@
 
    boolean isTcpNoDelay();
 
-   void setTcpNoDelay(boolean tcpNoDelay);
-   
-   long getWriteQueueMaxBytes();
-   
-   void setWriteQueueMaxBytes(long maxBytes);
-   
-   long getWriteQueueMinBytes();
-   
-   void setWriteQueueMinBytes(long minBytes);
-   
-   long getWriteQueueBlockTimeout();
-   
-   void setWriteQueueBlockTimeout(long timeout);
+   void setTcpNoDelay(boolean tcpNoDelay);   
 
    int getTcpReceiveBufferSize();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -80,9 +80,7 @@
    private boolean defaultBlockOnPersistentSend;
    
    private boolean defaultBlockOnNonPersistentSend;
-   
-   
-      
+        
    // Static ---------------------------------------------------------------------------------------
    
    public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
@@ -98,7 +96,7 @@
    public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
    
    public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
-    
+   
    // Constructors ---------------------------------------------------------------------------------
 
    /**
@@ -153,7 +151,7 @@
       defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
       defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
       defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-      defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+      defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;      
       this.location = location;
       this.connectionParams = connectionParams;
       this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -44,9 +44,6 @@
    protected String keyStorePassword;
    protected String trustStorePath;
    protected String trustStorePassword;
-   protected long writeQueueBlockTimeout = 10000;
-   protected long writeQueueMinBytes = 32 * 1024L;
-   protected long writeQueueMaxBytes = 64 * 1024L;
    
    public long getTimeout()
    {
@@ -128,36 +125,6 @@
       this.tcpSendBufferSize = tcpSendBufferSize;
    }
 
-   public long getWriteQueueBlockTimeout()
-   {
-      return writeQueueBlockTimeout;
-   }
-
-   public long getWriteQueueMaxBytes()
-   {
-      return writeQueueMaxBytes;
-   }
-
-   public long getWriteQueueMinBytes()
-   {
-      return writeQueueMinBytes;
-   }
-
-   public void setWriteQueueBlockTimeout(final long timeout)
-   {
-      this.writeQueueBlockTimeout = timeout;
-   }
-
-   public void setWriteQueueMaxBytes(final long bytes)
-   {
-      this.writeQueueMaxBytes = bytes;
-   }
-
-   public void setWriteQueueMinBytes(final long bytes)
-   {
-      this.writeQueueMinBytes = bytes;
-   }
-
    public boolean isSSLEnabled()
    {
       String sslEnabledProperty = System.getProperty(REMOTING_ENABLE_SSL);
@@ -258,9 +225,6 @@
              cp.getTcpReceiveBufferSize() == this.getTcpReceiveBufferSize() &&
              cp.getTcpSendBufferSize() == this.getTcpSendBufferSize() &&
              cp.isSSLEnabled() == this.isSSLEnabled() &&
-             cp.isSSLEnabledModified() == this.isSSLEnabledModified() &&
-             cp.getWriteQueueBlockTimeout() == this.getWriteQueueBlockTimeout() &&
-             cp.getWriteQueueMinBytes() == this.getWriteQueueMinBytes() &&
-             cp.getWriteQueueMaxBytes() == this.getWriteQueueMaxBytes();
+             cp.isSSLEnabledModified() == this.isSSLEnabledModified();
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -42,6 +42,8 @@
    Boolean isClustered();
 
    Integer getScheduledThreadPoolMaxSize();
+   
+   Integer getThreadPoolMaxSize();
 
    long getSecurityInvalidationInterval();
 
@@ -71,12 +73,6 @@
 
    long getTimeout();
    
-   long getWriteQueueMaxBytes();
-
-   long getWriteQueueMinBytes();
-   
-   long getWriteQueueBlockTimeout();
-      
    boolean isSecurityEnabled();
 
    String getKeyStorePath();

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -57,6 +57,8 @@
    
    protected int scheduledThreadPoolMaxSize = 30;
    
+   protected int threadPoolMaxSize = 30;
+   
    protected long securityInvalidationInterval = 10000;
 
    protected boolean requireDestinations;
@@ -102,10 +104,7 @@
    protected boolean invmDisabled = DEFAULT_INVM_DISABLED;
    protected boolean invmDisabledModified = false;
    protected boolean tcpNoDelay;
-   protected long writeQueueBlockTimeout = 5000;
-   protected long writeQueueMinBytes = 65536;
-   protected long writeQueueMaxBytes = 1048576;
-   
+
    protected int tcpReceiveBufferSize = -1;
    protected int tcpSendBufferSize = -1;
    protected boolean sslEnabled = DEFAULT_SSL_ENABLED;
@@ -129,6 +128,11 @@
    {
    	return scheduledThreadPoolMaxSize;
    }
+   
+   public Integer getThreadPoolMaxSize()
+   {
+      return threadPoolMaxSize;
+   }
       
    public long getSecurityInvalidationInterval()
    {
@@ -320,37 +324,7 @@
    {
       this.tcpSendBufferSize = size;
    }
-     
-   public long getWriteQueueBlockTimeout()
-   {
-      return writeQueueBlockTimeout;
-   }
-
-   public long getWriteQueueMaxBytes()
-   {
-      return writeQueueMaxBytes;
-   }
-
-   public long getWriteQueueMinBytes()
-   {
-      return writeQueueMinBytes;
-   }
-   
-   public void setWriteQueueBlockTimeout(final long timeout)
-   {
-      this.writeQueueBlockTimeout = timeout;
-   }
-
-   public void setWriteQueueMaxBytes(final long bytes)
-   {
-      this.writeQueueMaxBytes = bytes;
-   }
-
-   public void setWriteQueueMinBytes(final long bytes)
-   {
-      this.writeQueueMinBytes = bytes;
-   }
-
+  
    public String getURI()
    {
       StringBuffer buff = new StringBuffer();
@@ -461,9 +435,6 @@
       connectionParams.setTcpReceiveBufferSize(tcpReceiveBufferSize);
       connectionParams.setTcpSendBufferSize(tcpSendBufferSize);
       connectionParams.setTimeout(timeout);
-      connectionParams.setWriteQueueBlockTimeout(writeQueueBlockTimeout);
-      connectionParams.setWriteQueueMinBytes(writeQueueMinBytes);
-      connectionParams.setWriteQueueMaxBytes(writeQueueMaxBytes);
       return connectionParams;
    }
   

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -58,7 +58,9 @@
 
       clustered = getBoolean(e, "clustered", clustered);
 
-      scheduledThreadPoolMaxSize = getInteger(e, "scheduled-executor-max-pool-size", scheduledThreadPoolMaxSize);
+      scheduledThreadPoolMaxSize = getInteger(e, "scheduled-max-pool-size", scheduledThreadPoolMaxSize);
+      
+      threadPoolMaxSize = getInteger(e, "max-pool-size", threadPoolMaxSize);
 
       transport = TransportType.valueOf(getString(e, "remoting-transport", TCP.name()));
 
@@ -83,12 +85,6 @@
 
       keepAliveTimeout = getInteger(e, "remoting-keep-alive-timeout", ConnectionParams.DEFAULT_KEEP_ALIVE_TIMEOUT);
 
-      writeQueueBlockTimeout = getLong(e, "remoting-writequeue-block-timeout", 10000L);
-
-      writeQueueMinBytes = getLong(e, "remoting-writequeue-minbytes", 32 * 1024L);
-
-      writeQueueMaxBytes = getLong(e, "remoting-writequeue-maxbytes", 64 * 1024L);
-
       sslEnabled = getBoolean(e, "remoting-enable-ssl", false);
 
       keyStorePath = getString(e, "remoting-ssl-keystore-path", null);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -8,7 +8,7 @@
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.client.ConnectionParams;
 
@@ -16,7 +16,7 @@
 /**
  * The ConnectorRegistry keeps track of Configurations and NIOConnectors.
  * 
- * When a {@link MinaService} is started, it register its {@link Configuration}.
+ * When a {@link RemotingServiceImpl} is started, it register its {@link Configuration}.
  * 
  * When a client is created, it gets its {@link NIOConnector} from the
  * ConnectorRegistry using the {@link Configuration} corresponding to the server

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,9 +13,9 @@
  *
  * This is class is a simple way to intercepting server calls on JBoss Messaging.
  * 
- * To Add this interceptor, you have to modify jbm-configuration.xml, or call MinaService.addInterceptor manually.
+ * To Add this interceptor, you have to modify jbm-configuration.xml, or call RemotingServiceImpl.addInterceptor manually.
  * 
- * If you deploy any Interceptor as a POJO on the Microcontainer, MinaService.addInterceptor is called automagically.
+ * If you deploy any Interceptor as a POJO on the Microcontainer, RemotingServiceImpl.addInterceptor is called automagically.
  *  
  * @author clebert.suconic at jboss.com
  */

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -149,10 +149,7 @@
       threadPool = Executors.newCachedThreadPool();
       //We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
       //since they are put on the queue in order
-      handler = new MinaHandler(dispatcher, threadPool, this, false, false,
-              connectionParams.getWriteQueueBlockTimeout(),
-              connectionParams.getWriteQueueMinBytes(),
-              connectionParams.getWriteQueueMaxBytes());
+      handler = new MinaHandler(dispatcher, threadPool, this, false, false);
       connector.setHandler(handler);
       InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
       ConnectFuture future = connector.connect(address);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -36,8 +36,6 @@
 
    private static final Logger log = Logger.getLogger(MinaHandler.class);
 
-   private static final AttributeKey BLOCKED = new AttributeKey(MinaHandler.class, "blocked");
-
    private static boolean trace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
@@ -53,16 +51,6 @@
    // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
 
-   private final long blockTimeout;
-
-   //TODO - this is screwed - I want this to be zero, but unfortunately in messageSent, the current
-   //messages bytes haven't been subtracted so this won't work!!
-   private final long bytesLow;
-
-   private final long bytesHigh;
-   
-   private boolean blocked;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -70,18 +58,11 @@
                       final ExecutorService executorService,
                       final CleanUpNotifier failureNotifier,
                       final boolean closeSessionOnExceptionCaught,
-                      final boolean useExecutor,
-                      final long blockTimeout,
-                      final long bytesLow,
-                      final long bytesHigh)
+                      final boolean useExecutor)
    {
       assert dispatcher != null;
       assert executorService != null;
 
-      this.blockTimeout = blockTimeout;
-      this.bytesLow = bytesLow;
-      this.bytesHigh = bytesHigh;
-
       this.dispatcher = dispatcher;
       this.failureNotifier = failureNotifier;
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
@@ -134,7 +115,7 @@
 
    @Override
    public void messageReceived(final IoSession session, final Object message)
-   throws Exception
+      throws Exception
    {
       final Packet packet = (Packet) message;
 
@@ -177,58 +158,6 @@
       }
    }
 
-//   @Override
-//   public synchronized void messageSent(final IoSession session, final Object message) throws Exception
-//   {
-//      if (blocked)
-//      {
-//         long bytes = session.getScheduledWriteBytes();
-//
-//         if (bytes <= bytesLow)
-//         {
-//            blocked = false;
-//
-//            //Note that we need to notify all since there may be more than one thread waiting on this
-//            //E.g. the response from a blocking acknowledge and a delivery
-//            notifyAll();
-//         }
-//      }
-//   }
-//
-//   public synchronized void checkWrite(final IoSession session) throws Exception
-//   {
-//      while (session.getScheduledWriteBytes() >= bytesHigh)
-//      {
-//         blocked = true;
-//
-//         long start = System.currentTimeMillis();
-//
-//         long toWait = blockTimeout;
-//
-//         do
-//         {
-//            wait(toWait);
-//
-//            if (session.getScheduledWriteBytes() < bytesHigh)
-//            {
-//               break;
-//            }
-//
-//            long now = System.currentTimeMillis();
-//
-//            toWait -= now - start;
-//
-//            start = now;
-//         }
-//         while (toWait > 0);
-//
-//         if (toWait <= 0)
-//         {
-//            throw new IllegalStateException("Timed out waiting for MINA queue to free");
-//         }
-//      }
-//   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -246,15 +175,6 @@
          {
             public void send(Packet p) throws Exception
             {
-//               try
-//               {
-//                  checkWrite(session);
-//               }
-//               catch (Exception e)
-//               {
-//                  log.error("Failed to acquire sem", e);
-//               }
-
                dispatcher.callFilters(p);
 
                session.write(p);
@@ -276,7 +196,8 @@
          returner = null;
       }
 
-      if (trace) {
+      if (trace)
+      {
          log.trace("received packet " + packet);
       }
 

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -1,312 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import org.apache.mina.common.*;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.beans.metadata.api.annotations.Install;
-import org.jboss.beans.metadata.api.annotations.Uninstall;
-import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.ping.Pinger;
-import org.jboss.messaging.core.ping.impl.PingerImpl;
-import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingService;
-import static org.jboss.messaging.core.remoting.TransportType.INVM;
-import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @version <tt>$Revision$</tt>
- */
-public class MinaService implements RemotingService, CleanUpNotifier
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(MinaService.class);
-
-   // Attributes ----------------------------------------------------
-
-   private boolean started = false;
-
-   private Configuration config;
-
-   private NioSocketAcceptor acceptor;
-
-   private IoServiceListener acceptorListener;
-
-   private final PacketDispatcher dispatcher;
-
-   private ExecutorService threadPool;
-
-   private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
-
-   private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
-
-   private ServerKeepAliveFactory factory;
-
-   private ScheduledExecutorService scheduledExecutor;
-   private Map<IoSession, ScheduledFuture> currentScheduledPingers;
-   private Map<IoSession, Pinger> currentPingers;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public MinaService(Configuration config)
-   {
-      this(config, new ServerKeepAliveFactory());
-   }
-
-   public MinaService(Configuration config, ServerKeepAliveFactory factory)
-   {
-      assert config != null;
-      assert factory != null;
-
-      validate(config);
-
-      this.config = config;
-      this.factory = factory;
-      dispatcher = new PacketDispatcherImpl(filters);
-
-      scheduledExecutor = new ScheduledThreadPoolExecutor(config.getScheduledThreadPoolMaxSize());
-      currentScheduledPingers = new ConcurrentHashMap<IoSession, ScheduledFuture>();
-      currentPingers = new ConcurrentHashMap<IoSession, Pinger>();
-   }
-
-   @Install
-   public void addInterceptor(Interceptor filter)
-   {
-      filters.add(filter);
-   }
-
-   @Uninstall
-   public void removeInterceptor(Interceptor filter)
-   {
-      filters.remove(filter);
-   }
-
-   public void addRemotingSessionListener(RemotingSessionListener listener)
-   {
-      assert listener != null;
-
-      listeners.add(listener);
-   }
-
-   public void removeRemotingSessionListener(RemotingSessionListener listener)
-   {
-      assert listener != null;
-
-      listeners.remove(listener);
-   }
-
-   // TransportService implementation -------------------------------
-
-   public void start() throws Exception
-   {
-      if (log.isDebugEnabled())
-      {
-         log.debug("Start MinaService with configuration:" + config);
-      }
-
-      // if INVM transport is set, we bypass MINA setup
-      if (config.getTransport() != INVM
-              && acceptor == null)
-      {
-         acceptor = new NioSocketAcceptor();
-
-         acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
-         DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-
-         // addMDCFilter(filterChain);
-         if (config.isSSLEnabled())
-         {
-            addSSLFilter(filterChain, false, config.getKeyStorePath(),
-                    config.getKeyStorePassword(), config
-                    .getTrustStorePath(), config
-                    .getTrustStorePassword());
-         }
-         addCodecFilter(filterChain);
-
-         // Bind
-         acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
-         acceptor.getSessionConfig().setTcpNoDelay(config.isTcpNoDelay());
-         int receiveBufferSize = config.getTcpReceiveBufferSize();
-         if (receiveBufferSize != -1)
-         {
-            acceptor.getSessionConfig().setReceiveBufferSize(receiveBufferSize);
-         }
-         int sendBufferSize = config.getTcpSendBufferSize();
-         if (sendBufferSize != -1)
-         {
-            acceptor.getSessionConfig().setSendBufferSize(sendBufferSize);
-         }
-         acceptor.setReuseAddress(true);
-         acceptor.getSessionConfig().setReuseAddress(true);
-         acceptor.getSessionConfig().setKeepAlive(true);
-         acceptor.setCloseOnDeactivation(false);
-
-         threadPool = Executors.newCachedThreadPool();
-         acceptor.setHandler(new MinaHandler(dispatcher, threadPool,
-                 this, true, true,
-                 config.getWriteQueueBlockTimeout(),
-                 config.getWriteQueueMinBytes(),
-                 config.getWriteQueueMaxBytes()));
-         acceptor.bind();
-         acceptorListener = new MinaSessionListener();
-         acceptor.addListener(acceptorListener);
-      }
-
-      // TODO reenable invm transport
-//      boolean disableInvm = config.isInvmDisabled();
-//      if (log.isDebugEnabled())
-//         log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
-      // if (!disableInvm)
-
-      log.info("Registering:" + config.getLocation());
-      REGISTRY.register(config.getLocation(), dispatcher);
-
-      started = true;
-   }
-
-   public void stop()
-   {
-      if (acceptor != null)
-      {
-         // remove the listener before disposing the acceptor
-         // so that we're not notified when the sessions are destroyed
-         acceptor.removeListener(acceptorListener);
-         acceptor.unbind();
-         acceptor.dispose();
-         acceptor = null;
-         threadPool.shutdown();
-      }
-
-      REGISTRY.unregister(config.getLocation());
-
-      started = false;
-   }
-
-   public PacketDispatcher getDispatcher()
-   {
-      return dispatcher;
-   }
-
-   public Configuration getConfiguration()
-   {
-      return config;
-   }
-
-   public ServerKeepAliveFactory getKeepAliveFactory()
-   {
-      return factory;
-   }
-
-   /**
-    * This method must only be called by tests which requires
-    * to insert Filters (e.g. to simulate network failures)
-    */
-   public DefaultIoFilterChainBuilder getFilterChain()
-   {
-      assert started == true;
-      assert acceptor != null;
-
-      return acceptor.getFilterChain();
-   }
-
-   // FailureNotifier implementation -------------------------------
-
-   public void fireCleanup(long sessionID, MessagingException me)
-   {
-      if (factory.getSessions().contains(sessionID))
-      {
-         for (RemotingSessionListener listener : listeners)
-         {
-            listener.sessionDestroyed(sessionID, me);
-         }
-         factory.getSessions().remove(sessionID);
-      }
-   }
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   private final class MinaSessionListener implements IoServiceListener
-   {
-
-      public void serviceActivated(IoService service)
-      {
-      }
-
-      public void serviceDeactivated(IoService service)
-      {
-      }
-
-      public void serviceIdle(IoService service, IdleStatus idleStatus)
-      {
-      }
-
-      /**
-       * register a pinger for the new client
-       *
-       * @param session
-       */
-      public void sessionCreated(IoSession session)
-      {
-         //register pinger
-         if (config.getKeepAliveInterval() > 0)
-         {
-            Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), MinaService.this);
-            ScheduledFuture future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
-            currentScheduledPingers.put(session, future);
-            currentPingers.put(session, pinger);
-            factory.getSessions().add(session.getId());
-         }
-      }
-
-      /**
-       * destry th epinger and stop
-       *
-       * @param session
-       */
-      public void sessionDestroyed(IoSession session)
-      {
-         ScheduledFuture future = currentScheduledPingers.remove(session);
-         if (future != null)
-         {
-            future.cancel(true);
-         }
-         Pinger pinger = currentPingers.remove(session);
-         if (pinger != null)
-         {
-            pinger.close();
-         }
-         fireCleanup(session.getId(), null);
-      }
-   }
-}
\ No newline at end of file

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java (from rev 4415, trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -0,0 +1,320 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+import static org.jboss.messaging.core.remoting.TransportType.INVM;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.beans.metadata.api.annotations.Install;
+import org.jboss.beans.metadata.api.annotations.Uninstall;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.ping.Pinger;
+import org.jboss.messaging.core.ping.impl.PingerImpl;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class RemotingServiceImpl implements RemotingService, CleanUpNotifier
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private boolean started = false;
+
+   private Configuration config;
+
+   private NioSocketAcceptor acceptor;
+
+   private IoServiceListener acceptorListener;
+
+   private final PacketDispatcher dispatcher;
+
+   private ExecutorService threadPool;
+
+   private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
+
+   private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
+
+   private ServerKeepAliveFactory factory;
+
+   private ScheduledExecutorService scheduledExecutor;
+   private Map<IoSession, ScheduledFuture<?>> currentScheduledPingers;
+   private Map<IoSession, Pinger> currentPingers;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RemotingServiceImpl(Configuration config)
+   {
+      this(config, new ServerKeepAliveFactory());
+   }
+
+   public RemotingServiceImpl(Configuration config, ServerKeepAliveFactory factory)
+   {
+      assert config != null;
+      assert factory != null;
+
+      validate(config);
+
+      this.config = config;
+      this.factory = factory;
+      dispatcher = new PacketDispatcherImpl(filters);
+
+      scheduledExecutor = new ScheduledThreadPoolExecutor(config.getScheduledThreadPoolMaxSize());
+      currentScheduledPingers = new ConcurrentHashMap<IoSession, ScheduledFuture<?>>();
+      currentPingers = new ConcurrentHashMap<IoSession, Pinger>();
+   }
+
+   @Install
+   public void addInterceptor(Interceptor filter)
+   {
+      filters.add(filter);
+   }
+
+   @Uninstall
+   public void removeInterceptor(Interceptor filter)
+   {
+      filters.remove(filter);
+   }
+
+   public void addRemotingSessionListener(RemotingSessionListener listener)
+   {
+      assert listener != null;
+
+      listeners.add(listener);
+   }
+
+   public void removeRemotingSessionListener(RemotingSessionListener listener)
+   {
+      assert listener != null;
+
+      listeners.remove(listener);
+   }
+
+   // TransportService implementation -------------------------------
+
+   public void start() throws Exception
+   {
+      if (log.isDebugEnabled())
+      {
+         log.debug("Start RemotingServiceImpl with configuration:" + config);
+      }
+
+      // if INVM transport is set, we bypass MINA setup
+      if (config.getTransport() != INVM
+              && acceptor == null)
+      {
+         acceptor = new NioSocketAcceptor();
+
+         acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
+
+         DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
+
+         // addMDCFilter(filterChain);
+         if (config.isSSLEnabled())
+         {
+            addSSLFilter(filterChain, false, config.getKeyStorePath(),
+                    config.getKeyStorePassword(), config
+                    .getTrustStorePath(), config
+                    .getTrustStorePassword());
+         }
+         addCodecFilter(filterChain);
+
+         // Bind
+         acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
+         acceptor.getSessionConfig().setTcpNoDelay(config.isTcpNoDelay());
+         int receiveBufferSize = config.getTcpReceiveBufferSize();
+         if (receiveBufferSize != -1)
+         {
+            acceptor.getSessionConfig().setReceiveBufferSize(receiveBufferSize);
+         }
+         int sendBufferSize = config.getTcpSendBufferSize();
+         if (sendBufferSize != -1)
+         {
+            acceptor.getSessionConfig().setSendBufferSize(sendBufferSize);
+         }
+         acceptor.setReuseAddress(true);
+         acceptor.getSessionConfig().setReuseAddress(true);
+         acceptor.getSessionConfig().setKeepAlive(true);
+         acceptor.setCloseOnDeactivation(false);
+
+         threadPool = Executors.newCachedThreadPool();
+         acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
+         acceptor.bind();
+         acceptorListener = new MinaSessionListener();
+         acceptor.addListener(acceptorListener);
+      }
+
+      // TODO reenable invm transport
+//      boolean disableInvm = config.isInvmDisabled();
+//      if (log.isDebugEnabled())
+//         log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
+      // if (!disableInvm)
+
+      log.info("Registering:" + config.getLocation());
+      REGISTRY.register(config.getLocation(), dispatcher);
+
+      started = true;
+   }
+
+   public void stop()
+   {
+      if (acceptor != null)
+      {
+         // remove the listener before disposing the acceptor
+         // so that we're not notified when the sessions are destroyed
+         acceptor.removeListener(acceptorListener);
+         acceptor.unbind();
+         acceptor.dispose();
+         acceptor = null;
+         threadPool.shutdown();
+      }
+
+      REGISTRY.unregister(config.getLocation());
+
+      started = false;
+   }
+
+   public PacketDispatcher getDispatcher()
+   {
+      return dispatcher;
+   }
+
+   public Configuration getConfiguration()
+   {
+      return config;
+   }
+
+   public ServerKeepAliveFactory getKeepAliveFactory()
+   {
+      return factory;
+   }
+
+   /**
+    * This method must only be called by tests which requires
+    * to insert Filters (e.g. to simulate network failures)
+    */
+   public DefaultIoFilterChainBuilder getFilterChain()
+   {
+      assert started == true;
+      assert acceptor != null;
+
+      return acceptor.getFilterChain();
+   }
+
+   // FailureNotifier implementation -------------------------------
+
+   public void fireCleanup(long sessionID, MessagingException me)
+   {
+      if (factory.getSessions().contains(sessionID))
+      {
+         for (RemotingSessionListener listener : listeners)
+         {
+            listener.sessionDestroyed(sessionID, me);
+         }
+         factory.getSessions().remove(sessionID);
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private final class MinaSessionListener implements IoServiceListener
+   {
+
+      public void serviceActivated(IoService service)
+      {
+      }
+
+      public void serviceDeactivated(IoService service)
+      {
+      }
+
+      public void serviceIdle(IoService service, IdleStatus idleStatus)
+      {
+      }
+
+      /**
+       * register a pinger for the new client
+       *
+       * @param session
+       */
+      public void sessionCreated(IoSession session)
+      {
+         //register pinger
+         if (config.getKeepAliveInterval() > 0)
+         {
+            Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), RemotingServiceImpl.this);
+            ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
+            currentScheduledPingers.put(session, future);
+            currentPingers.put(session, pinger);
+            factory.getSessions().add(session.getId());
+         }
+      }
+
+      /**
+       * destry th epinger and stop
+       *
+       * @param session
+       */
+      public void sessionDestroyed(IoSession session)
+      {
+         ScheduledFuture<?> future = currentScheduledPingers.remove(session);
+         if (future != null)
+         {
+            future.cancel(true);
+         }
+         Pinger pinger = currentPingers.remove(session);
+         if (pinger != null)
+         {
+            pinger.close();
+         }
+         fireCleanup(session.getId(), null);
+      }
+   }
+}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -27,15 +27,16 @@
 import org.jboss.messaging.core.deployers.DeploymentManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.RemotingService;
-import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 
 /**
  * This interface defines the internal interface of the Messaging Server exposed
@@ -96,4 +97,6 @@
                                              PacketReturner sender) throws Exception;
 
    DeploymentManager getDeploymentManager();
+   
+   OrderedExecutorFactory getOrderedExecutorFactory();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -22,8 +22,11 @@
 package org.jboss.messaging.core.server.impl;
 
 import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -45,7 +48,7 @@
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
@@ -62,8 +65,10 @@
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.VersionLoader;
 
+
 /**
  * A Messaging Server
  *
@@ -100,6 +105,8 @@
    private Deployer queueSettingsDeployer;
    private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
    private DeploymentManager deploymentManager = new FileDeploymentManager();
+   private OrderedExecutorFactory orderedExecutorFactory;
+   private ExecutorService threadPool;
 
    // plugins
 
@@ -115,7 +122,6 @@
    private ResourceManager resourceManager = new ResourceManagerImpl(0);
    private ScheduledExecutorService scheduledExecutor;
    private MessagingServerPacketHandler serverPacketHandler;
-   private CleanUpNotifier cleanUpNotifier = null;
 
    // Constructors ---------------------------------------------------------------------------------
    /**
@@ -140,11 +146,12 @@
       this();
       this.configuration = configuration;
       createTransport = true;
-      remotingService = new MinaService(configuration);
-      cleanUpNotifier = (CleanUpNotifier) remotingService;
+      remotingService = new RemotingServiceImpl(configuration);      
    }
    // lifecycle methods ----------------------------------------------------------------
 
+   
+   
    public synchronized void start() throws Exception
    {
       log.debug("starting MessagingServer");
@@ -164,12 +171,14 @@
       securityStore.setSecurityManager(securityManager);
       securityDeployer = new SecurityDeployer(securityRepository);
       queueSettingsRepository.setDefault(new QueueSettings());
-      scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
+      scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       connectionManager = new ConnectionManagerImpl();
       memoryManager = new SimpleMemoryManager();
       postOffice = new PostOfficeImpl(storageManager, queueFactory, configuration.isRequireDestinations());
-      queueSettingsDeployer = new QueueSettingsDeployer(queueSettingsRepository);
+      queueSettingsDeployer = new QueueSettingsDeployer(queueSettingsRepository);      
+      threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), new JBMThreadFactory("JBM-session-threads"));
+      orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
 
       if (createTransport)
       {
@@ -226,6 +235,9 @@
       postOffice = null;
       scheduledExecutor.shutdown();
       scheduledExecutor = null;
+      threadPool.shutdown();
+      threadPool = null;
+      orderedExecutorFactory = null;
       if (createTransport)
       {
          remotingService.stop();
@@ -285,11 +297,6 @@
       this.storageManager = storageManager;
    }
 
-   public void setCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
-   {
-      this.cleanUpNotifier = cleanUpNotifier;
-   }
-
    public PostOffice getPostOffice()
    {
       return postOffice;
@@ -300,7 +307,6 @@
       this.postOffice = postOffice;
    }
 
-
    public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
    {
       return securityRepository;
@@ -353,12 +359,17 @@
                       sender.getSessionID(), clientAddress,
                       remotingService.getDispatcher(), resourceManager, storageManager,
                       queueSettingsRepository,
-                      postOffice, securityStore, connectionManager);
+                      postOffice, securityStore, connectionManager, orderedExecutorFactory);
 
       remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
 
       return new CreateConnectionResponse(connection.getID(), version);
    }
+   
+   public OrderedExecutorFactory getOrderedExecutorFactory()
+   {
+      return orderedExecutorFactory;
+   }
 
    // Public ---------------------------------------------------------------------------------------
 
@@ -367,5 +378,22 @@
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
+   
+   // Inner classes --------------------------------------------------------------------------------
+   
+   private static class JBMThreadFactory implements ThreadFactory
+   {
+      private ThreadGroup group;
+      
+      JBMThreadFactory(final String groupName)
+      {
+         this.group = new ThreadGroup(groupName);         
+      }
+      
+      public Thread newThread(Runnable command)
+      {        
+         return new Thread(group, command);
+      }         
+   }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -98,6 +98,8 @@
 
    private int pos;
    
+   private boolean locked;
+   
    private AtomicInteger sizeBytes = new AtomicInteger(0);
 
    private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -159,52 +161,56 @@
       return name;
    }
    
-   public HandleStatus addLast(final MessageReference ref)
+   public synchronized HandleStatus addLast(final MessageReference ref)
    {
-      lock.lock();
+      if (locked)
+      {
+         lock.lock();
+      }
       try
       {
          return add(ref, false);
       }
       finally
       {
-         lock.unlock();
+         if (locked)
+         {
+            lock.unlock();
+         }
       }
    }
 
-   public HandleStatus addFirst(final MessageReference ref)
+   public synchronized HandleStatus addFirst(final MessageReference ref)
    {
-      lock.lock();
+      if (locked)
+      {
+         lock.lock();
+      }      
       try
       {
          return add(ref, true);
       }
       finally
       {
-         lock.unlock();
+         if (locked)
+         {
+            lock.unlock();
+         }
       }
    }
 
    public void addListFirst(final LinkedList<MessageReference> list)
    {
-      lock.lock();
-      try
+      ListIterator<MessageReference> iter = list.listIterator(list.size());
+
+      while (iter.hasPrevious())
       {
-         ListIterator<MessageReference> iter = list.listIterator(list.size());
-   
-         while (iter.hasPrevious())
-         {
-            MessageReference ref = iter.previous();
-   
-            messageReferences.addFirst(ref, ref.getMessage().getPriority());
-         }
-   
-         deliver();
+         MessageReference ref = iter.previous();
+
+         messageReferences.addFirst(ref, ref.getMessage().getPriority());
       }
-      finally
-      {
-         lock.unlock();
-      }
+
+      deliver();
    }
  
    public void deliverAsync(final Executor executor)
@@ -222,75 +228,67 @@
     * 
     * @see org.jboss.messaging.newcore.intf.Queue#deliver()
     */
-   public void deliver()
+   public synchronized void deliver()
    {
-      lock.lock();
-      try
+      MessageReference reference;
+
+      ListIterator<MessageReference> iterator = null;
+
+      while (true)
       {
-         MessageReference reference;
-   
-         ListIterator<MessageReference> iterator = null;
-   
-         while (true)
+         if (iterator == null)
          {
-            if (iterator == null)
+            reference = messageReferences.peekFirst();
+         }
+         else
+         {
+            if (iterator.hasNext())
             {
-               reference = messageReferences.peekFirst();
+               reference = iterator.next();
             }
             else
             {
-               if (iterator.hasNext())
-               {
-                  reference = iterator.next();
-               }
-               else
-               {
-                  reference = null;
-               }
+               reference = null;
             }
-   
-            if (reference == null)
+         }
+
+         if (reference == null)
+         {
+            if (iterator == null)
             {
-               if (iterator == null)
-               {
-                  // We delivered all the messages - go into direct delivery
-                  direct = true;
-                  
-                  promptDelivery = false;
-               }
-               return;
+               // We delivered all the messages - go into direct delivery
+               direct = true;
+               
+               promptDelivery = false;
             }
-   
-            HandleStatus status = deliver(reference);
-   
-            if (status == HandleStatus.HANDLED)
+            return;
+         }
+
+         HandleStatus status = deliver(reference);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            if (iterator == null)
             {
-               if (iterator == null)
-               {
-                  messageReferences.removeFirst();              
-               }
-               else
-               {
-                  iterator.remove();
-               }
+               messageReferences.removeFirst();              
             }
-            else if (status == HandleStatus.BUSY)
+            else
             {
-               // All consumers busy - give up
-               break;
+               iterator.remove();
             }
-            else if (status == HandleStatus.NO_MATCH && iterator == null)
-            {
-               // Consumers not all busy - but filter not accepting - iterate back
-               // through the queue
-               iterator = messageReferences.iterator();
-            }
          }
+         else if (status == HandleStatus.BUSY)
+         {
+            // All consumers busy - give up
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         {
+            // Consumers not all busy - but filter not accepting - iterate back
+            // through the queue
+            iterator = messageReferences.iterator();
+         }
       }
-      finally
-      {
-         lock.unlock();
-      }
    }
 
    public synchronized void addConsumer(final Consumer consumer)
@@ -502,14 +500,18 @@
       tx.commit();
    }
    
-   public void lock()
+   public synchronized void lock()
    {
       lock.lock();
+      
+      locked = true;
    }
    
-   public void unlock()
-   {
-      lock.unlock();
+   public synchronized void unlock()
+   {            
+      lock.unlock();          
+      
+      locked = false;
    }
 
    // Public

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -41,6 +41,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.ConcurrentHashSet;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -89,6 +90,8 @@
    private final SecurityStore securityStore;
    
    private final ConnectionManager connectionManager;
+   
+   private final OrderedExecutorFactory orderedExecutorFactory;
 
    private final long createdTime;
          
@@ -111,7 +114,8 @@
    		                      final StorageManager persistenceManager,
    		                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
    		                      final PostOffice postOffice, final SecurityStore securityStore,
-   		                      final ConnectionManager connectionManager)
+   		                      final ConnectionManager connectionManager,
+   		                      final OrderedExecutorFactory orderedExecutorFactory)
    {
    	this.id = id;
       
@@ -137,6 +141,8 @@
       
       this.connectionManager = connectionManager;
       
+      this.orderedExecutorFactory = orderedExecutorFactory;
+      
       started = false;
       
       createdTime = System.currentTimeMillis();
@@ -158,7 +164,8 @@
       long id = dispatcher.generateID();
       ServerSession session =
          new ServerSessionImpl(id, autoCommitSends, autoCommitAcks, xa, this, resourceManager,
-         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore);
+         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore,
+         		orderedExecutorFactory.getOrderedExecutor());
 
       sessions.add(session);
       

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -27,8 +27,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.XAException;
@@ -131,12 +130,10 @@
 
    private final AtomicLong deliveryIDSequence = new AtomicLong(0);
 
-   private final ExecutorService executor = Executors.newSingleThreadExecutor();
+   private final Executor executor;
 
    private Transaction tx;
 
-  // private final Object rollbackCancelLock = new Object();
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -146,7 +143,8 @@
                             final ResourceManager resourceManager, final PacketReturner sender,
                             final PacketDispatcher dispatcher, final StorageManager persistenceManager,
                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-                            final PostOffice postOffice, final SecurityStore securityStore) throws Exception
+                            final PostOffice postOffice, final SecurityStore securityStore,
+                            final Executor executor) throws Exception
    {
       this.id = id;
 
@@ -174,6 +172,8 @@
       this.postOffice = postOffice;
 
       this.securityStore = securityStore;
+      
+      this.executor = executor;
 
       if (log.isTraceEnabled())
       {
@@ -278,8 +278,6 @@
 
       rollback();
 
-      executor.shutdown();
-
       deliveries.clear();
 
       connection.removeSession(this);

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/QueueTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -172,7 +172,7 @@
 
          	conn2 = cf.createConnection();
 
-         	Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         	Session s = conn1.createSession(true, Session.SESSION_TRANSACTED);
 
          	MessageProducer p = s.createProducer(queue1);
 
@@ -183,7 +183,7 @@
 
             s.commit();
 
-            Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            Session s2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
 
             // Create a consumer, start the session, close the consumer..
             // This shouldn't cause any message to be lost

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -103,7 +103,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
 
       handler_1 = new TestPacketHandler(23);
       clientDispatcher.register(handler_1);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -79,7 +79,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true, 5000, 1 * 204 * 1024, 5 * 1024 * 1024);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
 
       packetHandler = new TestPacketHandler(23);
       clientDispatcher.register(packetHandler);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaServiceTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -15,7 +15,7 @@
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
 import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 
 public class MinaServiceTest extends TestCase
 {
@@ -66,7 +66,7 @@
 
       ConfigurationImpl config = new ConfigurationImpl();
       config.setTransport(INVM);
-      invmService = new MinaService(config);
+      invmService = new RemotingServiceImpl(config);
       invmService.start();
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,7 +13,7 @@
 import org.jboss.messaging.core.remoting.NIOConnector;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
 import org.jboss.messaging.tests.unit.core.remoting.impl.SessionTestBase;
 
@@ -26,7 +26,7 @@
 public class MinaSessionTest extends SessionTestBase
 {
 
-   private MinaService service;
+   private RemotingServiceImpl service;
 
    // Constants -----------------------------------------------------
 
@@ -55,7 +55,7 @@
    @Override
    protected PacketDispatcher startServer() throws Exception
    {
-      service = new MinaService(createRemotingConfiguration());
+      service = new RemotingServiceImpl(createRemotingConfiguration());
       service.start();
       return service.getDispatcher();
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/ServerKeepAliveTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -13,7 +13,7 @@
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -33,7 +33,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private MinaService service;
+   private RemotingServiceImpl service;
 
    // Static --------------------------------------------------------
 
@@ -64,7 +64,7 @@
               "localhost", TestSupport.PORT);
       clientConfig.setKeepAliveInterval(TestSupport.KEEP_ALIVE_INTERVAL);
       clientConfig.setKeepAliveTimeout(TestSupport.KEEP_ALIVE_TIMEOUT);
-      service = new MinaService(config, new DummyServerKeepAliveFactory());
+      service = new RemotingServiceImpl(config, new DummyServerKeepAliveFactory());
       service.start();
 
       MinaConnector connector = new MinaConnector(clientConfig.getLocation(), clientConfig.getConnectionParams(), new PacketDispatcherImpl(null));

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/impl/MeasureBase.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -17,7 +17,7 @@
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 
 
@@ -28,7 +28,7 @@
  */
 public abstract class MeasureBase extends TestCase
 {
-   protected MinaService service;
+   protected RemotingServiceImpl service;
    protected PacketDispatcher serverDispatcher;
 
    @Override
@@ -146,7 +146,7 @@
 
    protected void startServer() throws Exception
    {
-      service = new MinaService(createConfiguration());
+      service = new RemotingServiceImpl(createConfiguration());
       service.start();
       serverDispatcher = service.getDispatcher();
       System.out.println("Server Dispatcher = " + serverDispatcher);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java	2008-06-09 20:50:57 UTC (rev 4416)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java	2008-06-10 11:49:22 UTC (rev 4417)
@@ -32,7 +32,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.TransportType;
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.RemotingServiceImpl;
 import org.jboss.messaging.core.server.ConnectionManager;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.impl.MessagingServerImpl;
@@ -53,7 +53,7 @@
    // Constants -----------------------------------------------------
    Logger log = Logger.getLogger(ClientNetworkFailureTest.class);
    private MessagingServer server;
-   private MinaService minaService;
+   private RemotingServiceImpl minaService;
    private NetworkFailureFilter networkFailureFilter;
 
    // Static --------------------------------------------------------
@@ -80,7 +80,7 @@
       newConfig.setKeepAliveTimeout(KEEP_ALIVE_TIMEOUT);
       server = new MessagingServerImpl(newConfig);
       server.start();
-      minaService = (MinaService) server.getRemotingService();
+      minaService = (RemotingServiceImpl) server.getRemotingService();
       networkFailureFilter = new NetworkFailureFilter();
       minaService.getFilterChain().addFirst("network-failure",
               networkFailureFilter);




More information about the jboss-cvs-commits mailing list