[jboss-cvs] JBoss Messaging SVN: r5491 - in trunk: src/main/org/jboss/messaging/integration/transports/netty and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:09:56 EST 2008
Author: ataylor
Date: 2008-12-09 12:09:56 -0500 (Tue, 09 Dec 2008)
New Revision: 5491
Added:
trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/config/jbm-jms.xml
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
Log:
more netty integration work plus tests
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/config/jbm-configuration.xml 2008-12-09 17:09:56 UTC (rev 5491)
@@ -104,6 +104,15 @@
<param key="jbm.remoting.netty.port" value="5500" type="Integer"/>
<param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
</connector>
+
+ <connector name="netty-http">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+ <param key="jbm.remoting.netty.port" value="6100" type="Integer"/>
+ <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+ <param key="jbm.remoting.netty.httpclientidletime" value="500" type="Long"/>
+ <param key="jbm.remoting.netty.httpclientidlescanperiod" value="500" type="Long"/>
+ </connector>
<connector name="in-vm">
<factory-class>org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -130,12 +139,14 @@
<param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
</acceptor>
-->
- <!-- Netty HTTP Acceptor
- <acceptor>
+ <!--Netty HTTP Acceptor-->
+ <!--<acceptor>
<factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="jbm.remoting.netty.host" value="localhost" type="String"/>
- <param key="jbm.remoting.netty.port" value="8080" type="Integer"/>
- <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+ <param key="jbm.remoting.netty.port" value="6100" type="Integer"/>
+ <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+ <param key="jbm.remoting.netty.httpresponsetime" value="10000" type="Long"/>
+ <param key="jbm.remoting.netty.httpserverscanperiod" value="5000" type="Long"/>
</acceptor>-->
<!-- Mina Acceptor -->
Modified: trunk/src/config/jbm-jms.xml
===================================================================
--- trunk/src/config/jbm-jms.xml 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/config/jbm-jms.xml 2008-12-09 17:09:56 UTC (rev 5491)
@@ -84,6 +84,11 @@
<entry name="/TestSSLConnectionFactory"/>
</connection-factory>
+ <connection-factory name="TestHttpConnectionFactory">
+ <connector-ref connector-name="netty-http"/>
+ <entry name="/TestHttpConnectionFactory"/>
+ </connection-factory>
+
<queue name="MyQueue">
<entry name="MyQueue"/>
</queue>
Added: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * takes care of making sure that every request has a response and also that any uninitiated responses always wait for a response.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+ at ChannelPipelineCoverage("one")
+class HttpAcceptorHandler extends SimpleChannelHandler
+{
+ private final BlockingQueue<ResponseHolder> responses = new LinkedBlockingQueue<ResponseHolder>();
+
+ private final BlockingQueue<Runnable> delayedResponses = new LinkedBlockingQueue<Runnable>();
+
+ private final Executor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);
+
+ private final HttpKeepAliveTask httpKeepAliveTask;
+
+ private final long responseTime;
+
+ private Channel channel;
+
+ public HttpAcceptorHandler(final HttpKeepAliveTask httpKeepAliveTask, long responseTime)
+ {
+ super();
+ this.responseTime = responseTime;
+ this.httpKeepAliveTask = httpKeepAliveTask;
+ }
+
+ public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+ {
+ super.channelConnected(ctx, e);
+ channel = e.getChannel();
+ httpKeepAliveTask.registerKeepAliveHandler(this);
+ }
+
+ public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+ {
+ super.channelDisconnected(ctx, e);
+ httpKeepAliveTask.unregisterKeepAliveHandler(this);
+ channel = null;
+ }
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+ {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ HttpMethod method = request.getMethod();
+ //if we are a post then we send upstream, otherwise we are just being prompted for a response.
+ if (method.equals(HttpMethod.POST))
+ {
+ MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+ ctx.sendUpstream(event);
+ }
+ //add a new response
+ responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
+ }
+
+ @Override
+ public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+ {
+ //we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
+ if (e.getMessage() instanceof ChannelBuffer)
+ {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ executor.execute(new ResponseRunner(buf));
+ }
+ else
+ {
+ write(ctx, e.getChannel(), e.getFuture(), e.getMessage(), e.getRemoteAddress());
+ }
+ }
+
+ public void keepAlive(final long time)
+ {
+ //send some responses to catch up thus avoiding any timeout.
+ int lateResponses = 0;
+ for (ResponseHolder response : responses)
+ {
+ if (response.timeReceived < time)
+ {
+ lateResponses++;
+ }
+ else
+ {
+ break;
+ }
+ }
+ for (int i = 0; i < lateResponses; i++)
+ {
+ executor.execute(new ResponseRunner());
+ }
+ }
+
+ /**
+ * this is prompted to delivery when a response is available in the response queue.
+ */
+ class ResponseRunner implements Runnable
+ {
+ private final ChannelBuffer buffer;
+
+ private final boolean bogusResponse;
+ public ResponseRunner(final ChannelBuffer buffer)
+ {
+ this.buffer = buffer;
+ bogusResponse = false;
+ }
+
+ public ResponseRunner()
+ {
+ bogusResponse = true;
+ buffer = ChannelBuffers.buffer(0);
+ }
+
+ public void run()
+ {
+ ResponseHolder responseHolder = null;
+ do
+ {
+ try
+ {
+ responseHolder = responses.take();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore, we'll just try again
+ }
+ }
+ while (responseHolder == null);
+ if(!bogusResponse)
+ {
+ piggyBackResponses();
+ }
+ responseHolder.response.setContent(buffer);
+ responseHolder.response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.writerIndex()));
+ channel.write(responseHolder.response);
+ }
+
+ private void piggyBackResponses()
+ {
+ //if we are the last available response then we have to piggy back any remaining responses
+ if(responses.isEmpty())
+ {
+ do
+ {
+ try
+ {
+ ResponseRunner responseRunner = (ResponseRunner) delayedResponses.poll(0, TimeUnit.MILLISECONDS);
+ if(responseRunner == null)
+ {
+ break;
+ }
+ buffer.writeBytes(responseRunner.buffer);
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+ }
+ while (responses.isEmpty());
+ }
+ }
+ }
+
+ /**
+ * a holder class so we know what time the request first arrived
+ */
+ private class ResponseHolder
+ {
+ final HttpResponse response;
+ final long timeReceived;
+
+ public ResponseHolder(long timeReceived, HttpResponse response)
+ {
+ this.timeReceived = timeReceived;
+ this.response = response;
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * A simple Timer Task to allow HttpAcceptorHandlers to be called intermittently.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class HttpKeepAliveTask extends TimerTask
+{
+ private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
+ public synchronized void run()
+ {
+ long time = System.currentTimeMillis();
+ for (HttpAcceptorHandler handler : handlers)
+ {
+ handler.keepAlive(time);
+ }
+ }
+
+ public synchronized void registerKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+ {
+ handlers.add(httpAcceptorHandler);
+ }
+
+ public synchronized void unregisterKeepAliveHandler(HttpAcceptorHandler httpAcceptorHandler)
+ {
+ handlers.remove(httpAcceptorHandler);
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -31,7 +31,6 @@
import org.jboss.messaging.util.ConfigurationHelper;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@@ -43,24 +42,16 @@
import org.jboss.netty.channel.ChannelStateEvent;
import static org.jboss.netty.channel.Channels.pipeline;
import static org.jboss.netty.channel.Channels.write;
-import org.jboss.netty.channel.DefaultMessageEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -95,6 +86,10 @@
private final boolean httpEnabled;
+ private final long httpServerScanPeriod;
+
+ private final long httpResponseTime;
+
private final boolean useNio;
private final String host;
@@ -115,6 +110,10 @@
private final int tcpReceiveBufferSize;
+ private final Timer httpKeepAliveTimer;
+
+ private final HttpKeepAliveTask httpKeepAliveTask;
+
public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
@@ -126,6 +125,22 @@
ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
this.httpEnabled =
ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+
+ if(httpEnabled)
+ {
+ httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration);
+ httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration);
+ httpKeepAliveTimer = new Timer(true);
+ httpKeepAliveTask = new HttpKeepAliveTask();
+ httpKeepAliveTimer.schedule(httpKeepAliveTask, httpServerScanPeriod, httpServerScanPeriod);
+ }
+ else
+ {
+ httpServerScanPeriod = 0;
+ httpResponseTime = 0;
+ httpKeepAliveTimer = null;
+ httpKeepAliveTask = null;
+ }
this.useNio =
ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
this.host =
@@ -167,7 +182,10 @@
//Already started
return;
}
-
+ if(httpKeepAliveTimer != null)
+ {
+ httpKeepAliveTimer.cancel();
+ }
bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
if (useNio)
@@ -218,7 +236,7 @@
{
pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
- pipeline.addLast("httphandler", new HttpHandler());
+ pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveTask, httpResponseTime));
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
@@ -313,26 +331,4 @@
}
}
}
-
- @ChannelPipelineCoverage("all")
- class HttpHandler extends SimpleChannelHandler
- {
- @Override
- public void messageReceived(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
- {
- HttpRequest request = (HttpRequest) e.getMessage();
- MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
- ctx.sendUpstream(event);
- }
-
- @Override
- public void writeRequested(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
- {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
- response.setContent(buf);
- response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
- write(ctx, e.getChannel(), e.getFuture(), response, e.getRemoteAddress());
- }
- }
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -43,6 +43,7 @@
import org.jboss.netty.channel.DefaultMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
@@ -59,12 +60,13 @@
import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
- *
* A NettyConnector
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -94,6 +96,10 @@
private final boolean httpEnabled;
+ private final long httpMaxClientIdleTime;
+
+ private final long httpClientIdleScanPeriod;
+
private final boolean useNio;
private final String host;
@@ -138,8 +144,21 @@
TransportConstants.DEFAULT_SSL_ENABLED,
configuration);
this.httpEnabled =
- ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+ ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+ if(httpEnabled)
+ {
+ this.httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME,
+ TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
+ this.httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD,
+ TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
+ }
+ else
+ {
+ this.httpMaxClientIdleTime = 0;
+ this.httpClientIdleScanPeriod = -1;
+ }
+
this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
TransportConstants.DEFAULT_USE_NIO,
configuration);
@@ -354,23 +373,74 @@
@ChannelPipelineCoverage("all")
class HttpHandler extends SimpleChannelHandler
{
+ private Channel channel;
+ private long lastSendTime = 0;
+ private boolean waitingGet = false;
+
+ private Timer idleClientTimer;
+
+ public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+ {
+ super.channelConnected(ctx, e);
+ channel = e.getChannel();
+ if (httpClientIdleScanPeriod > 0)
+ {
+ idleClientTimer = new Timer("Http Idle Timer", true);
+ idleClientTimer.schedule(new HttpIdleTimerTask(), httpClientIdleScanPeriod, httpClientIdleScanPeriod);
+ }
+ }
+
+
+ public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+ {
+ if (idleClientTimer != null)
+ {
+ idleClientTimer.cancel();
+ }
+ super.channelClosed(ctx, e);
+ }
+
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
HttpResponse response = (HttpResponse) e.getMessage();
MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+ waitingGet = false;
ctx.sendUpstream(event);
}
@Override
public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
- HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
- httpRequest.setContent(buf);
- httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
- write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+ if (e.getMessage() instanceof ChannelBuffer)
+ {
+ HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ httpRequest.setContent(buf);
+ httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+ write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+ lastSendTime = System.currentTimeMillis();
+ }
+ else
+ {
+ write(ctx, e.getChannel(), e.getFuture(), e.getMessage(), e.getRemoteAddress());
+ lastSendTime = System.currentTimeMillis();
+ }
}
+
+ private class HttpIdleTimerTask extends TimerTask
+ {
+ long currentTime = System.currentTimeMillis();
+ public void run()
+ {
+ if(!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
+ {
+ HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/jbm/");
+ waitingGet = true;
+ channel.write(httpRequest);
+ }
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -32,6 +32,14 @@
public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
+
+ public static final String HTTP_CLIENT_IDLE_PROP_NAME = "jbm.remoting.netty.httpclientidletime";
+
+ public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "jbm.remoting.netty.httpclientidlescanperiod";
+
+ public static final String HTTP_RESPONSE_TIME_PROP_NAME = "jbm.remoting.netty.httpresponsetime";
+
+ public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "jbm.remoting.netty.httpserverscanperiod";
public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
@@ -76,4 +84,13 @@
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
public static final boolean DEFAULT_HTTP_ENABLED = false;
+
+ public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
+
+ public static final long DEFAULT_HTTP_CLIENT_SCAN_PERIOD = 500;
+
+ public static final long DEFAULT_HTTP_RESPONSE_TIME = 10000;
+
+ public static final long DEFAULT_HTTP_SERVER_SCAN_PERIOD = 5000;
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -84,7 +84,7 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 100;
+ final int numMessages = 10000;
for (int i = 0; i < numMessages; i++)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java 2008-12-09 16:49:14 UTC (rev 5490)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -95,4 +95,35 @@
messagingService.stop();
}
+
+ public void testCoreHttpClientIdle() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("CoreClientOverHttpTestQueue");
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ HashMap<String, Object> params = new HashMap<String, Object>();
+ params.put("jbm.remoting.netty.httpenabled", true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ Thread.sleep(messagingService.getServer().getConfiguration().getConnectionScanPeriod() * 5);
+
+ session.close();
+
+ messagingService.stop();
+ }
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/NettyHttpTest.java 2008-12-09 17:09:56 UTC (rev 5491)
@@ -0,0 +1,447 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.http;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.integration.transports.netty.NettyAcceptor;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+import org.jboss.messaging.integration.transports.netty.NettyConnector;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class NettyHttpTest extends UnitTestCase
+{
+ private NettyAcceptor acceptor;
+
+ public void testSendAndReceiveAtSameTime() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ conn.write(buff);
+ buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ acceptorListener.connection.write(buff);
+ }
+ acceptorLatch.await(10, TimeUnit.SECONDS);
+ connectorLatch.await(10, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, numPackets);
+ assertEquals(connectorHandler.messagesReceieved, numPackets);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ i = 0;
+ for (Integer j : connectorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+
+ public void testSendThenReceive() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ conn.write(buff);
+ }
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ acceptorListener.connection.write(buff);
+ }
+ acceptorLatch.await(10, TimeUnit.SECONDS);
+ connectorLatch.await(10, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, numPackets);
+ assertEquals(connectorHandler.messagesReceieved, numPackets);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ i = 0;
+ for (Integer j : connectorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+
+ public void testReceiveThenSend() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ acceptorListener.connection.write(buff);
+ }
+
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ conn.write(buff);
+ }
+ acceptorLatch.await(10, TimeUnit.SECONDS);
+ connectorLatch.await(10, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, numPackets);
+ assertEquals(connectorHandler.messagesReceieved, numPackets);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ i = 0;
+ for (Integer j : connectorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+
+ public void testReceivePiggyBackOnOneResponse() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(1);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ acceptorListener.connection.write(buff);
+ }
+
+
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(0);
+ buff.flip();
+ conn.write(buff);
+
+ acceptorLatch.await(10, TimeUnit.SECONDS);
+ connectorLatch.await(10, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, 1);
+ assertEquals(connectorHandler.messagesReceieved, numPackets);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ i = 0;
+ for (Integer j : connectorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+
+ public void testReceivePiggyBackOnIdleClient() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(0);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, 500l);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ acceptorListener.connection.write(buff);
+ }
+
+ acceptorLatch.await(10, TimeUnit.SECONDS);
+ connectorLatch.await(10, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, 0);
+ assertEquals(connectorHandler.messagesReceieved, numPackets);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ i = 0;
+ for (Integer j : connectorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+
+ public void testSendWithNoReceive() throws Exception
+ {
+
+ int numPackets = 1000;
+ CountDownLatch connCreatedLatch = new CountDownLatch(1);
+ CountDownLatch acceptorLatch = new CountDownLatch(numPackets);
+ CountDownLatch connectorLatch = new CountDownLatch(numPackets);
+ HashMap<String, Object> conf = new HashMap<String, Object>();
+ conf.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
+ conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
+ conf.put(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, 500l);
+ conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
+ DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
+ SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener);
+ acceptor.start();
+
+ BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
+ NettyConnector connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null));
+ connector.start();
+ Connection conn = connector.createConnection();
+ connCreatedLatch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < numPackets; i++)
+ {
+ MessagingBuffer buff = conn.createBuffer(8);
+ buff.putInt(4);
+ buff.putInt(i);
+ buff.flip();
+ conn.write(buff);
+ }
+ acceptorLatch.await(100, TimeUnit.SECONDS);
+ connectorLatch.await(0, TimeUnit.SECONDS);
+ conn.close();
+ assertEquals(acceptorHandler.messagesReceieved, numPackets);
+ assertEquals(connectorHandler.messagesReceieved, 0);
+ int i = 0;
+ for (Integer j : acceptorHandler.messages)
+ {
+ assertTrue(i == j);
+ i++;
+ }
+ }
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+
+ if(acceptor != null)
+ {
+ acceptor.stop();
+ acceptor = null;
+ }
+ }
+
+ class SimpleBufferHandler extends AbstractBufferHandler
+ {
+ int messagesReceieved = 0;
+
+ ArrayList<Integer> messages = new ArrayList<Integer>();
+ private CountDownLatch latch;
+
+ public SimpleBufferHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+
+ public void bufferReceived(Object connectionID, MessagingBuffer buffer)
+ {
+ int i = buffer.getInt();
+ messages.add(i);
+ messagesReceieved++;
+ latch.countDown();
+ }
+ }
+
+ class BogusResponseHandler implements BufferHandler
+ {
+ int messagesReceieved = 0;
+
+ ArrayList<Integer> messages = new ArrayList<Integer>();
+ private CountDownLatch latch;
+
+ public BogusResponseHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public int isReadyToHandle(MessagingBuffer buffer)
+ {
+ return 0;
+ }
+
+ public void bufferReceived(Object connectionID, MessagingBuffer buffer)
+ {
+ int i = buffer.getInt();
+ messages.add(i);
+ messagesReceieved++;
+ latch.countDown();
+ }
+ }
+
+ class DummyConnectionLifeCycleListener implements ConnectionLifeCycleListener
+ {
+ Connection connection;
+
+ private CountDownLatch latch;
+
+ public DummyConnectionLifeCycleListener(CountDownLatch connCreatedLatch)
+ {
+ this.latch = connCreatedLatch;
+ }
+
+ public void connectionCreated(Connection connection)
+ {
+ this.connection = connection;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+ }
+
+ public void connectionDestroyed(Object connectionID)
+ {
+ }
+
+ public void connectionException(Object connectionID, MessagingException me)
+ {
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list