[jboss-cvs] JBoss Messaging SVN: r5491 - in trunk: src/main/org/jboss/messaging/integration/transports/netty and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:09:56 EST 2008


Author: ataylor
Date: 2008-12-09 12:09:56 -0500 (Tue, 09 Dec 2008)
New Revision: 5491

Added:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
   trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/config/jbm-jms.xml
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
   trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
Log:
more netty integration work plus tests

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/config/jbm-configuration.xml	2008-12-09 17:09:56 UTC (rev 5491)
@@ -104,6 +104,15 @@
          <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>	                     
          <param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
       </connector>
+
+      <connector name="netty-http">
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+         <param key="jbm.remoting.netty.port" value="6100" type="Integer"/>
+         <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+         <param key="jbm.remoting.netty.httpclientidletime" value="500" type="Long"/>
+         <param key="jbm.remoting.netty.httpclientidlescanperiod" value="500" type="Long"/>
+      </connector>
       
       <connector name="in-vm">
          <factory-class>org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -130,12 +139,14 @@
          <param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
       </acceptor> 
       -->
-       <!-- Netty HTTP Acceptor
-      <acceptor>
+        <!--Netty HTTP Acceptor-->
+      <!--<acceptor>
          <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
          <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
-         <param key="jbm.remoting.netty.port" value="8080" type="Integer"/>
-         <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>  
+         <param key="jbm.remoting.netty.port" value="6100" type="Integer"/>
+         <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+         <param key="jbm.remoting.netty.httpresponsetime" value="10000" type="Long"/>
+         <param key="jbm.remoting.netty.httpserverscanperiod" value="5000" type="Long"/>
       </acceptor>-->
 
       <!-- Mina Acceptor -->

Modified: trunk/src/config/jbm-jms.xml
===================================================================
--- trunk/src/config/jbm-jms.xml	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/config/jbm-jms.xml	2008-12-09 17:09:56 UTC (rev 5491)
@@ -84,6 +84,11 @@
       <entry name="/TestSSLConnectionFactory"/>
    </connection-factory>
 
+   <connection-factory name="TestHttpConnectionFactory">
+      <connector-ref connector-name="netty-http"/>
+      <entry name="/TestHttpConnectionFactory"/>
+   </connection-factory>
+
    <queue name="MyQueue">
       <entry name="MyQueue"/>
    </queue>

Added: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * takes care of making sure that every request has a response and also that any uninitiated responses always wait for a response.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+ at ChannelPipelineCoverage("one")
+class HttpAcceptorHandler extends SimpleChannelHandler
+{
+   private final BlockingQueue<ResponseHolder> responses = new LinkedBlockingQueue<ResponseHolder>();
+
+   private final BlockingQueue<Runnable> delayedResponses = new LinkedBlockingQueue<Runnable>();
+
+   private final Executor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);
+
+   private final HttpKeepAliveTask httpKeepAliveTask;
+
+   private final long responseTime;
+
+   private Channel channel;
+
+   public HttpAcceptorHandler(final HttpKeepAliveTask httpKeepAliveTask, long responseTime)
+   {
+      super();
+      this.responseTime = responseTime;
+      this.httpKeepAliveTask = httpKeepAliveTask;
+   }
+
+   public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+   {
+      super.channelConnected(ctx, e);
+      channel = e.getChannel();
+      httpKeepAliveTask.registerKeepAliveHandler(this);
+   }
+
+   public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+   {
+      super.channelDisconnected(ctx, e);
+      httpKeepAliveTask.unregisterKeepAliveHandler(this);
+      channel = null;
+   }
+
+   @Override
+   public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+   {
+      HttpRequest request = (HttpRequest) e.getMessage();
+      HttpMethod method = request.getMethod();
+      //if we are a post then we send upstream, otherwise we are just being prompted for a response.
+      if (method.equals(HttpMethod.POST))
+      {
+         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+         ctx.sendUpstream(event);
+      }
+      //add a new response
+      responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
+   }
+
+   @Override
+   public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+   {
+      //we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
+      if (e.getMessage() instanceof ChannelBuffer)
+      {
+         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+         executor.execute(new ResponseRunner(buf));
+      }
+      else
+      {
+         write(ctx, e.getChannel(), e.getFuture(), e.getMessage(), e.getRemoteAddress());
+      }
+   }
+
+   public void keepAlive(final long time)
+   {
+      //send some responses to catch up thus avoiding any timeout.
+      int lateResponses = 0;
+      for (ResponseHolder response : responses)
+      {
+         if (response.timeReceived < time)
+         {
+            lateResponses++;
+         }
+         else
+         {
+            break;
+         }
+      }
+      for (int i = 0; i < lateResponses; i++)
+      {
+         executor.execute(new ResponseRunner());
+      }
+   }
+
+   /**
+    * this is prompted to delivery when a response is available in the response queue.
+    */
+   class ResponseRunner implements Runnable
+   {
+      private final ChannelBuffer buffer;
+
+      private final boolean bogusResponse;
+      public ResponseRunner(final ChannelBuffer buffer)
+      {
+         this.buffer = buffer;
+         bogusResponse = false;
+      }
+
+      public ResponseRunner()
+      {
+         bogusResponse = true;
+         buffer = ChannelBuffers.buffer(0);
+      }
+      
+      public void run()
+      {
+         ResponseHolder responseHolder = null;
+         do
+         {
+            try
+            {
+               responseHolder = responses.take();
+            }
+            catch (InterruptedException e)
+            {
+               //ignore, we'll just try again
+            }
+         }
+         while (responseHolder == null);
+         if(!bogusResponse)
+         {
+            piggyBackResponses();
+         }
+         responseHolder.response.setContent(buffer);
+         responseHolder.response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.writerIndex()));
+         channel.write(responseHolder.response);
+      }
+
+      private void piggyBackResponses()
+      {
+         //if we are the last available response then we have to piggy back any remaining responses
+         if(responses.isEmpty())
+         {
+            do
+            {
+               try
+               {
+                  ResponseRunner responseRunner = (ResponseRunner) delayedResponses.poll(0, TimeUnit.MILLISECONDS);
+                  if(responseRunner == null)
+                  {
+                     break;
+                  }
+                  buffer.writeBytes(responseRunner.buffer);
+               }
+               catch (InterruptedException e)
+               {
+                  break;
+               }
+            }
+            while (responses.isEmpty());
+         }
+      }
+   }
+
+   /**
+    * a holder class so we know what time  the request first arrived
+    */
+   private class ResponseHolder
+   {
+      final HttpResponse response;
+      final long timeReceived;
+
+      public ResponseHolder(long timeReceived, HttpResponse response)
+      {
+         this.timeReceived = timeReceived;
+         this.response = response;
+      }
+   }
+   
+}

Added: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * A simple Timer Task to allow HttpAcceptorHandlers to be called intermittently.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class HttpKeepAliveTask extends TimerTask
+{
+   private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
+   public synchronized void run()
+   {
+      long time = System.currentTimeMillis();
+      for (HttpAcceptorHandler handler : handlers)
+      {
+         handler.keepAlive(time);
+      }
+   }
+
+   public synchronized void registerKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+   {
+      handlers.add(httpAcceptorHandler);
+   }
+
+   public synchronized void unregisterKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+   {
+      handlers.remove(httpAcceptorHandler);
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -31,7 +31,6 @@
 import org.jboss.messaging.util.ConfigurationHelper;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -43,24 +42,16 @@
 import org.jboss.netty.channel.ChannelStateEvent;
 import static org.jboss.netty.channel.Channels.pipeline;
 import static org.jboss.netty.channel.Channels.write;
-import org.jboss.netty.channel.DefaultMessageEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
 import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
 import javax.net.ssl.SSLContext;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -95,6 +86,10 @@
 
    private final boolean httpEnabled;
 
+   private final long httpServerScanPeriod;
+
+   private final long httpResponseTime;
+
    private final boolean useNio;
 
    private final String host;
@@ -115,6 +110,10 @@
 
    private final int tcpReceiveBufferSize;
 
+   private final Timer httpKeepAliveTimer;
+
+   private final HttpKeepAliveTask httpKeepAliveTask;
+
    public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
                         final ConnectionLifeCycleListener listener)
    {
@@ -126,6 +125,22 @@
             ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
       this.httpEnabled =
             ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+
+      if(httpEnabled)
+      {
+         httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration);
+         httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration);
+         httpKeepAliveTimer = new Timer(true);
+         httpKeepAliveTask = new HttpKeepAliveTask();
+         httpKeepAliveTimer.schedule(httpKeepAliveTask, httpServerScanPeriod, httpServerScanPeriod);
+      }
+      else
+      {
+         httpServerScanPeriod = 0;
+         httpResponseTime = 0;
+         httpKeepAliveTimer = null;
+         httpKeepAliveTask = null;
+      }
       this.useNio =
             ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
       this.host =
@@ -167,7 +182,10 @@
          //Already started
          return;
       }
-
+      if(httpKeepAliveTimer != null)
+      {
+         httpKeepAliveTimer.cancel();
+      }
       bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
       workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
       if (useNio)
@@ -218,7 +236,7 @@
             {
                pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
-               pipeline.addLast("httphandler", new HttpHandler());
+               pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveTask, httpResponseTime));
             }
 
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
@@ -313,26 +331,4 @@
          }
       }
    }
-
-   @ChannelPipelineCoverage("all")
-   class HttpHandler extends SimpleChannelHandler
-   {
-      @Override
-      public void messageReceived(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
-      {
-         HttpRequest request = (HttpRequest) e.getMessage();
-         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
-         ctx.sendUpstream(event);
-      }
-
-      @Override
-      public void writeRequested(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
-      {
-         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-         response.setContent(buf);
-         response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
-         write(ctx, e.getChannel(), e.getFuture(), response, e.getRemoteAddress());
-      }
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -43,6 +43,7 @@
 import org.jboss.netty.channel.DefaultMessageEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
@@ -59,12 +60,13 @@
 import javax.net.ssl.SSLException;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
  * A NettyConnector
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -94,6 +96,10 @@
 
    private final boolean httpEnabled;
 
+   private final long httpMaxClientIdleTime;
+
+   private final long httpClientIdleScanPeriod;
+
    private final boolean useNio;
 
    private final String host;
@@ -138,8 +144,21 @@
                                                                TransportConstants.DEFAULT_SSL_ENABLED,
                                                                configuration);
       this.httpEnabled =
-                  ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+            ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
 
+      if(httpEnabled)
+      {
+         this.httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME,
+                                                                      TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
+         this.httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD,
+                                                                      TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
+      }
+      else
+      {
+         this.httpMaxClientIdleTime = 0;
+         this.httpClientIdleScanPeriod = -1;
+      }
+
       this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
                                                            TransportConstants.DEFAULT_USE_NIO,
                                                            configuration);
@@ -354,23 +373,74 @@
    @ChannelPipelineCoverage("all")
    class HttpHandler extends SimpleChannelHandler
    {
+      private Channel channel;
+      private long lastSendTime = 0;
+      private boolean waitingGet = false;
+
+      private Timer idleClientTimer;
+
+      public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+      {
+         super.channelConnected(ctx, e);
+         channel = e.getChannel();
+         if (httpClientIdleScanPeriod > 0)
+         {
+            idleClientTimer = new Timer("Http Idle Timer", true);
+            idleClientTimer.schedule(new HttpIdleTimerTask(), httpClientIdleScanPeriod, httpClientIdleScanPeriod);
+         }
+      }
+
+
+      public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+      {
+         if (idleClientTimer != null)
+         {
+            idleClientTimer.cancel();
+         }
+         super.channelClosed(ctx, e);
+      }
+
       @Override
       public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
       {
          HttpResponse response = (HttpResponse) e.getMessage();
          MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+         waitingGet = false;
          ctx.sendUpstream(event);
       }
 
       @Override
       public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
       {
-         HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
-         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-         httpRequest.setContent(buf);
-         httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
-         write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+         if (e.getMessage() instanceof ChannelBuffer)
+         {
+            HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
+            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            httpRequest.setContent(buf);
+            httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+            write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+            lastSendTime = System.currentTimeMillis();
+         }
+         else
+         {
+            write(ctx, e.getChannel(), e.getFuture(), e.getMessage(), e.getRemoteAddress());
+            lastSendTime = System.currentTimeMillis();
+         }
       }
+
+      private class HttpIdleTimerTask extends TimerTask
+      {
+         long currentTime = System.currentTimeMillis();
+         public void run()
+         {
+            if(!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
+            {
+               HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/jbm/");
+               waitingGet = true;
+               channel.write(httpRequest);
+            }
+         }
+      }
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -32,6 +32,14 @@
    public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
    
    public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
+
+   public static final String HTTP_CLIENT_IDLE_PROP_NAME = "jbm.remoting.netty.httpclientidletime";
+
+   public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "jbm.remoting.netty.httpclientidlescanperiod";
+
+   public static final String HTTP_RESPONSE_TIME_PROP_NAME = "jbm.remoting.netty.httpresponsetime";
+
+   public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "jbm.remoting.netty.httpserverscanperiod";
    
    public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
    
@@ -76,4 +84,13 @@
    public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
 
    public static final boolean DEFAULT_HTTP_ENABLED = false;
+
+   public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
+
+   public static final long DEFAULT_HTTP_CLIENT_SCAN_PERIOD = 500;
+
+   public static final long DEFAULT_HTTP_RESPONSE_TIME = 10000;
+
+   public static final long DEFAULT_HTTP_SERVER_SCAN_PERIOD = 5000;
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -84,7 +84,7 @@
       
       ClientProducer producer = session.createProducer(QUEUE);     
       
-      final int numMessages = 100;
+      final int numMessages = 10000;
       
       for (int i = 0; i < numMessages; i++)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java	2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -95,4 +95,35 @@
 
       messagingService.stop();
    }
+
+   public void testCoreHttpClientIdle() throws Exception
+   {
+      final SimpleString QUEUE = new SimpleString("CoreClientOverHttpTestQueue");
+
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put("jbm.remoting.netty.httpenabled", true);
+      conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
+
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+      messagingService.start();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params));
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      Thread.sleep(messagingService.getServer().getConfiguration().getConnectionScanPeriod() * 5);
+
+      session.close();
+
+      messagingService.stop();
+   }
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java	2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,447 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.http;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.integration.transports.netty.NettyAcceptor;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+import org.jboss.messaging.integration.transports.netty.NettyConnector;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class NettyHttpTest extends UnitTestCase
+{
+   private NettyAcceptor acceptor;
+
+   public void testSendAndReceiveAtSameTime() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         conn.write(buff);
+         buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         acceptorListener.connection.write(buff);
+      }
+      acceptorLatch.await(10, TimeUnit.SECONDS);
+      connectorLatch.await(10, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, numPackets);
+      assertEquals(connectorHandler.messagesReceieved, numPackets);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+       i = 0;
+      for (Integer j : connectorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+
+   public void testSendThenReceive() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         conn.write(buff);
+      }
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         acceptorListener.connection.write(buff);
+      }
+      acceptorLatch.await(10, TimeUnit.SECONDS);
+      connectorLatch.await(10, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, numPackets);
+      assertEquals(connectorHandler.messagesReceieved, numPackets);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+       i = 0;
+      for (Integer j : connectorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+
+   public void testReceiveThenSend() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         acceptorListener.connection.write(buff);
+      }
+
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         conn.write(buff);
+      }
+      acceptorLatch.await(10, TimeUnit.SECONDS);
+      connectorLatch.await(10, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, numPackets);
+      assertEquals(connectorHandler.messagesReceieved, numPackets);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+       i = 0;
+      for (Integer j : connectorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+
+   public void testReceivePiggyBackOnOneResponse() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(1);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         acceptorListener.connection.write(buff);
+      }
+
+
+      MessagingBuffer buff = conn.createBuffer(8);
+      buff.putInt(4);
+      buff.putInt(0);
+      buff.flip();
+      conn.write(buff);
+
+      acceptorLatch.await(10, TimeUnit.SECONDS);
+      connectorLatch.await(10, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, 1);
+      assertEquals(connectorHandler.messagesReceieved, numPackets);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+       i = 0;
+      for (Integer j : connectorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+
+   public void testReceivePiggyBackOnIdleClient() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(0);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, 500l);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         acceptorListener.connection.write(buff);
+      }
+
+      acceptorLatch.await(10, TimeUnit.SECONDS);
+      connectorLatch.await(10, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, 0);
+      assertEquals(connectorHandler.messagesReceieved, numPackets);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+       i = 0;
+      for (Integer j : connectorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+
+   public void testSendWithNoReceive() throws Exception
+   {
+
+      int numPackets = 1000;
+      CountDownLatch connCreatedLatch = new CountDownLatch(1);
+      CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+      CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+      HashMap<String, Object> conf = new HashMap<String, Object>();
+      conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+      conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+      conf.put(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, 500l);
+      conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
+      DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+      SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+      acceptor.start();
+
+      BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
+      NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+      connector.start();
+      Connection conn = connector.createConnection();
+      connCreatedLatch.await(5, TimeUnit.SECONDS);
+      for (int i = 0; i < numPackets; i++)
+      {
+         MessagingBuffer buff = conn.createBuffer(8);
+         buff.putInt(4);
+         buff.putInt(i);
+         buff.flip();
+         conn.write(buff);
+      }
+      acceptorLatch.await(100, TimeUnit.SECONDS);
+      connectorLatch.await(0, TimeUnit.SECONDS);
+      conn.close();
+      assertEquals(acceptorHandler.messagesReceieved, numPackets);
+      assertEquals(connectorHandler.messagesReceieved, 0);
+      int i = 0;
+      for (Integer j : acceptorHandler.messages)
+      {
+         assertTrue(i == j);
+         i++;
+      }
+   }
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      
+      if(acceptor != null)
+      {
+         acceptor.stop();
+         acceptor = null;
+      }
+   }
+
+   class SimpleBufferHandler extends AbstractBufferHandler
+   {
+      int messagesReceieved = 0;
+
+      ArrayList<Integer> messages = new ArrayList<Integer>();
+      private CountDownLatch latch;
+
+      public SimpleBufferHandler(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+
+      public void bufferReceived(Object connectionID, MessagingBuffer buffer)
+      {
+         int i = buffer.getInt();
+         messages.add(i);
+         messagesReceieved++;
+         latch.countDown();
+      }
+   }
+
+   class BogusResponseHandler implements BufferHandler
+   {
+      int messagesReceieved = 0;
+
+      ArrayList<Integer> messages = new ArrayList<Integer>();
+      private CountDownLatch latch;
+
+      public BogusResponseHandler(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public int isReadyToHandle(MessagingBuffer buffer)
+      {
+         return 0;
+      }
+
+      public void bufferReceived(Object connectionID, MessagingBuffer buffer)
+      {
+         int i = buffer.getInt();
+         messages.add(i);
+         messagesReceieved++;
+         latch.countDown();
+      }
+   }
+
+   class DummyConnectionLifeCycleListener implements ConnectionLifeCycleListener
+   {
+      Connection connection;
+
+      private CountDownLatch latch;
+
+      public DummyConnectionLifeCycleListener(CountDownLatch connCreatedLatch)
+      {
+         this.latch = connCreatedLatch;
+      }
+
+      public void connectionCreated(Connection connection)
+      {
+        this.connection = connection;
+         if (latch != null)
+         {
+            latch.countDown();
+         }
+      }
+
+      public void connectionDestroyed(Object connectionID)
+      {
+      }
+
+      public void connectionException(Object connectionID, MessagingException me)
+      {
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list