[jboss-cvs] JBoss Messaging SVN: r7558 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 10 08:19:50 EDT 2009


Author: jmesnil
Date: 2009-07-10 08:19:49 -0400 (Fri, 10 Jul 2009)
New Revision: 7558

Added:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveRunnable.java
Removed:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptorFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnectorFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/spi/AcceptorFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/spi/ConnectorFactory.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptorFactory.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnectorFactory.java
   trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
* replaced use of Timers for HTTP transport with Netty by Runnable & scheduled thread pool
* updated remoting SPI to add a ScheduledExecutorService when creating connector/acceptor
* removed unused static method RemotingConnectionImpl.createConnection()

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -879,7 +879,7 @@
             {
                DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
-               connector = connectorFactory.createConnector(transportParams, handler, this, threadPool);
+               connector = connectorFactory.createConnector(transportParams, handler, this, threadPool, scheduledThreadPool);
 
                if (connector != null)
                {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -17,7 +17,6 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -28,9 +27,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.Connector;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 
@@ -49,37 +45,6 @@
    // Static
    // ---------------------------------------------------------------------------------------
 
-   public static RemotingConnection createConnection(final ConnectorFactory connectorFactory,
-                                                     final Map<String, Object> params,
-                                                     final long callTimeout,
-                                                     final Executor threadPool,
-                                                     final ConnectionLifeCycleListener listener)
-   {
-      DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
-      Connector connector = connectorFactory.createConnector(params, handler, listener, threadPool);
-
-      if (connector == null)
-      {
-         return null;
-      }
-
-      connector.start();
-
-      Connection tc = connector.createConnection();
-
-      if (tc == null)
-      {
-         return null;
-      }
-
-      RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, null);
-
-      handler.conn = connection;
-
-      return connection;
-   }
-
    // Attributes
    // -----------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -23,6 +23,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.remoting.spi.Acceptor;
 import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
@@ -40,7 +41,8 @@
 
    public Acceptor createAcceptor(final Map<String, Object> configuration,
             final BufferHandler handler, final ConnectionLifeCycleListener listener,
-            final Executor threadPool)
+            final Executor threadPool,
+            final ScheduledExecutorService scheduledThreadPool)
    {
       return new InVMAcceptor(configuration, handler, listener, threadPool);
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnectorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnectorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnectorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -23,6 +23,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -40,7 +41,8 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
-                                    final Executor threadPool)
+                                    final Executor threadPool, 
+                                    final ScheduledExecutorService scheduledThreadPool)
    {      
       InVMConnector connector = new InVMConnector(configuration, handler, listener, threadPool);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -152,7 +152,7 @@
 
             AcceptorFactory factory = (AcceptorFactory)clazz.newInstance();
 
-            Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this, threadPool);
+            Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this, threadPool, scheduledThreadPool);
 
             acceptors.add(acceptor);
 
@@ -181,7 +181,7 @@
 
          AcceptorFactory factory = new InVMAcceptorFactory();
 
-         Acceptor acceptor = factory.createAcceptor(params, bufferHandler, this, threadPool);
+         Acceptor acceptor = factory.createAcceptor(params, bufferHandler, this, threadPool, scheduledThreadPool);
 
          acceptors.add(acceptor);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/AcceptorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/AcceptorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -24,15 +24,18 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
 public interface AcceptorFactory
 {
    Acceptor createAcceptor(final Map<String, Object> configuration,
                            BufferHandler handler,                        
                            ConnectionLifeCycleListener listener,
-                           Executor threadPool);
+                           Executor threadPool,
+                           ScheduledExecutorService scheduledThreadPool);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/ConnectorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/ConnectorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -24,6 +24,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * 
@@ -36,5 +37,6 @@
 {
    Connector createConnector(Map<String, Object> configuration, BufferHandler handler,                           
                              ConnectionLifeCycleListener listener,
-                             Executor threadPool);
+                             Executor threadPool, 
+                             ScheduledExecutorService scheduledThreadPool);
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -58,13 +58,13 @@
 
    private final Executor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);
 
-   private final HttpKeepAliveTask httpKeepAliveTask;
+   private final HttpKeepAliveRunnable httpKeepAliveTask;
 
    private final long responseTime;
 
    private Channel channel;
 
-   public HttpAcceptorHandler(final HttpKeepAliveTask httpKeepAliveTask, long responseTime)
+   public HttpAcceptorHandler(final HttpKeepAliveRunnable httpKeepAliveTask, long responseTime)
    {
       super();
       this.responseTime = responseTime;

Copied: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveRunnable.java (from rev 7551, trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveRunnable.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveRunnable.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * A simple Runnable to allow HttpAcceptorHandlers to be called intermittently.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class HttpKeepAliveRunnable implements Runnable
+{
+   private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
+   private boolean closed = false;
+   private Future<?> future;
+
+   public synchronized void run()
+   {
+      if (closed)
+      {
+         return;
+      }
+      
+      long time = System.currentTimeMillis();
+      for (HttpAcceptorHandler handler : handlers)
+      {
+         handler.keepAlive(time);
+      }
+   }
+
+   public synchronized void registerKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+   {
+      handlers.add(httpAcceptorHandler);
+   }
+
+   public synchronized void unregisterKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+   {
+      handlers.remove(httpAcceptorHandler);
+   }
+
+   public void close()
+   {
+      if (future != null)
+      {             
+         future.cancel(false);
+      }
+
+      closed  = true;
+   }
+
+   public synchronized void setFuture(Future<?> future)
+   {
+      this.future = future;
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.integration.transports.netty;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimerTask;
-
-/**
- * A simple Timer Task to allow HttpAcceptorHandlers to be called intermittently.
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class HttpKeepAliveTask extends TimerTask
-{
-   private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
-   private boolean closed = false;
-
-   public synchronized void run()
-   {
-      if (closed)
-      {
-         return;
-      }
-      
-      long time = System.currentTimeMillis();
-      for (HttpAcceptorHandler handler : handlers)
-      {
-         handler.keepAlive(time);
-      }
-   }
-
-   public synchronized void registerKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
-   {
-      handlers.add(httpAcceptorHandler);
-   }
-
-   public synchronized void unregisterKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
-   {
-      handlers.remove(httpAcceptorHandler);
-   }
-
-   public synchronized boolean cancel()
-   {
-      closed  = true;
-
-      return super.cancel();
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -28,10 +28,12 @@
 import java.net.SocketAddress;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 
@@ -119,25 +121,28 @@
 
    private final int tcpReceiveBufferSize;
 
-   private final Timer httpKeepAliveTimer;
+   private final HttpKeepAliveRunnable httpKeepAliveRunnable;
 
-   private final HttpKeepAliveTask httpKeepAliveTask;
-
    private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
    
    private boolean paused;
    
    private final Executor threadPool;
 
+   private final ScheduledExecutorService scheduledThreadPool;
+
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
                         final ConnectionLifeCycleListener listener,
-                        final Executor threadPool)
+                        final Executor threadPool,
+                        final ScheduledExecutorService scheduledThreadPool)
    {
       this.handler = handler;
 
       this.listener = listener;
 
+      this.scheduledThreadPool = scheduledThreadPool;
+      
       this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
                                                                TransportConstants.DEFAULT_SSL_ENABLED,
                                                                configuration);
@@ -153,16 +158,15 @@
          httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME,
                                                                 TransportConstants.DEFAULT_HTTP_RESPONSE_TIME,
                                                                 configuration);
-         httpKeepAliveTimer = new Timer();
-         httpKeepAliveTask = new HttpKeepAliveTask();
-         httpKeepAliveTimer.schedule(httpKeepAliveTask, httpServerScanPeriod, httpServerScanPeriod);
+         httpKeepAliveRunnable = new HttpKeepAliveRunnable();
+         Future<?> future = this.scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
+         httpKeepAliveRunnable.setFuture(future);
       }
       else
       {
          httpServerScanPeriod = 0;
          httpResponseTime = 0;
-         httpKeepAliveTimer = null;
-         httpKeepAliveTask = null;
+         httpKeepAliveRunnable = null;
       }
       this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
                                                            TransportConstants.DEFAULT_USE_NIO,
@@ -269,7 +273,7 @@
             {
                pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
-               pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveTask, httpResponseTime));
+               pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
             }
 
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
@@ -385,11 +389,9 @@
          }
       }
 
-      if (httpKeepAliveTimer != null)
+      if (httpKeepAliveRunnable != null)
       {
-         httpKeepAliveTask.cancel();
-
-         httpKeepAliveTimer.cancel();
+         httpKeepAliveRunnable.close();
       }
       
       ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -24,6 +24,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.remoting.spi.Acceptor;
 import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
@@ -40,8 +41,9 @@
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
                                   final ConnectionLifeCycleListener listener,
-                                  final Executor threadPool)
+                                  final Executor threadPool,
+                                  final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyAcceptor(configuration, handler, listener, threadPool);
+      return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool);
    }
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -31,11 +31,11 @@
 import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLException;
@@ -139,6 +139,8 @@
    
    private final VirtualExecutorService virtualExecutor;
 
+   private ScheduledExecutorService scheduledThreadPool;
+
    
    // Static --------------------------------------------------------
 
@@ -149,7 +151,8 @@
    public NettyConnector(final Map<String, Object> configuration,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
-                         final Executor threadPool)
+                         final Executor threadPool,
+                         final ScheduledExecutorService scheduledThreadPool)
    {
       if (listener == null)
       {
@@ -231,6 +234,8 @@
                                                                      configuration);
       
       virtualExecutor = new VirtualExecutorService(threadPool); 
+      
+      this.scheduledThreadPool = scheduledThreadPool;      
    }
 
    public synchronized void start()
@@ -439,10 +444,8 @@
 
       private boolean waitingGet = false;
 
-      private Timer idleClientTimer;
+      private HttpIdleTimer task;
 
-      private HttpIdleTimerTask task;
-
       private String url = "http://" + host + ":" + port + servletPath;
 
       private Future handShakeFuture = new Future();
@@ -463,19 +466,17 @@
          channel = e.getChannel();
          if (httpClientIdleScanPeriod > 0)
          {
-            idleClientTimer = new Timer("Http Idle Timer", true);
-            task = new HttpIdleTimerTask();
-            idleClientTimer.schedule(task, httpClientIdleScanPeriod, httpClientIdleScanPeriod);
+            task = new HttpIdleTimer();
+            java.util.concurrent.Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, httpClientIdleScanPeriod, httpClientIdleScanPeriod, TimeUnit.MILLISECONDS);
+            task.setFuture(future);
          }
       }
 
       public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
-         if (idleClientTimer != null)
+         if (task != null)
          {
-            task.cancel();
-
-            idleClientTimer.cancel();
+            task.close();
          }
 
          super.channelClosed(ctx, e);
@@ -542,9 +543,10 @@
          }
       }
 
-      private class HttpIdleTimerTask extends TimerTask
+      private class HttpIdleTimer implements Runnable
       {
          private boolean closed = false;
+         private java.util.concurrent.Future<?> future;
 
          public synchronized void run()
          {
@@ -561,11 +563,19 @@
             }
          }
 
-         public synchronized boolean cancel()
+         public synchronized void setFuture(final java.util.concurrent.Future<?> future)
          {
+            this.future = future;
+         }
+         
+         public void close()
+         {
+            if (future != null)
+            {             
+               future.cancel(false);
+            }
+            
             closed  = true;
-
-            return super.cancel();
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnectorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnectorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnectorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -2,6 +2,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -18,8 +19,9 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
-                                    final Executor threadPool)
+                                    final Executor threadPool, 
+                                    final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyConnector(configuration, handler, listener, threadPool);
+      return new NettyConnector(configuration, handler, listener, threadPool, scheduledThreadPool);
    }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -26,8 +26,10 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -49,7 +51,9 @@
    private NettyConnector connector;
    
    private ExecutorService threadPool;
-   
+
+   private ScheduledExecutorService scheduledThreadPool;
+
    @Override
    protected void setUp() throws Exception
    {
@@ -58,6 +62,8 @@
       checkFreePort(TransportConstants.DEFAULT_PORT);
 
       threadPool = Executors.newCachedThreadPool();
+
+      scheduledThreadPool = Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
    }
    
    @Override
@@ -76,6 +82,8 @@
       
       threadPool.shutdownNow();
 
+      scheduledThreadPool.shutdownNow();
+
       checkFreePort(TransportConstants.DEFAULT_PORT);
 
       super.tearDown();
@@ -94,11 +102,11 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -144,11 +152,11 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -197,11 +205,11 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -251,11 +259,11 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -304,11 +312,11 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -353,11 +361,11 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -395,11 +403,11 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -24,6 +24,7 @@
 
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -57,7 +58,7 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
-                                    final Executor executor)
+                                    final Executor executor, ScheduledExecutorService scheduledThreadPool)
    {
       return new MockConnector(configuration, handler, listener);
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.concurrent.Executors;
 
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
 import org.jboss.messaging.core.remoting.spi.Acceptor;
@@ -74,7 +75,9 @@
          }
       };
            
-      Acceptor acceptor = factory.createAcceptor(params, handler, listener, Executors.newCachedThreadPool());
+      Acceptor acceptor = factory.createAcceptor(params, handler, listener, 
+                                                 Executors.newCachedThreadPool(),
+                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
 
       assertTrue(acceptor instanceof NettyAcceptor);
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.concurrent.Executors;
 
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -70,7 +71,9 @@
          {
          }
       };
-      NettyAcceptor acceptor = new NettyAcceptor(params, handler, listener, Executors.newCachedThreadPool());
+      NettyAcceptor acceptor = new NettyAcceptor(params, handler, listener, 
+                                                 Executors.newCachedThreadPool(), 
+                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
 
       acceptor.start();
       assertTrue(acceptor.isStarted());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2009-07-10 11:40:31 UTC (rev 7557)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2009-07-10 12:19:49 UTC (rev 7558)
@@ -78,7 +78,7 @@
          }
       };
       
-      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool());
+      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
       
       connector.start();
       assertTrue(connector.isStarted());
@@ -112,7 +112,7 @@
 
       try
       {
-         new NettyConnector(params, null, listener, Executors.newCachedThreadPool());
+         new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
          
          fail("Should throw Exception");
       }
@@ -123,7 +123,7 @@
       
       try
       {
-         new NettyConnector(params, handler, null, Executors.newCachedThreadPool());
+         new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
          
          fail("Should throw Exception");
       }




More information about the jboss-cvs-commits mailing list