[jboss-cvs] JBoss Messaging SVN: r5499 - trunk/src/main/org/jboss/messaging/integration/transports/netty.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 10 05:49:10 EST 2008
Author: timfox
Date: 2008-12-10 05:49:10 -0500 (Wed, 10 Dec 2008)
New Revision: 5499
Modified:
trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
Log:
Some cosmetics and tweaks
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -192,59 +192,77 @@
public void putChar(final char chr)
{
- putShort((short) chr);
+ putShort((short)chr);
}
public byte getByte()
{
- try {
+ try
+ {
return buffer.readByte();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public short getUnsignedByte()
{
- try {
+ try
+ {
return buffer.readUnsignedByte();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public void getBytes(final byte[] b)
{
- try {
+ try
+ {
buffer.readBytes(b);
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public void getBytes(final byte[] b, final int offset, final int length)
{
- try {
+ try
+ {
buffer.readBytes(b, offset, length);
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public int getInt()
{
- try {
+ try
+ {
return buffer.readInt();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public long getLong()
{
- try {
+ try
+ {
return buffer.readLong();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
@@ -256,18 +274,24 @@
public short getShort()
{
- try {
+ try
+ {
return buffer.readShort();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
public int getUnsignedShort()
{
- try {
+ try
+ {
return buffer.readUnsignedShort();
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
}
@@ -279,7 +303,7 @@
public char getChar()
{
- return (char) getShort();
+ return (char)getShort();
}
public void putBoolean(final boolean b)
@@ -287,7 +311,8 @@
if (b)
{
putByte(TRUE);
- } else
+ }
+ else
{
putByte(FALSE);
}
@@ -305,7 +330,7 @@
buffer.writeInt(nullableString.length());
for (int i = 0; i < nullableString.length(); i++)
{
- buffer.writeShort((short) nullableString.charAt(i));
+ buffer.writeShort((short)nullableString.charAt(i));
}
buffer.readerIndex(buffer.writerIndex());
}
@@ -355,13 +380,13 @@
{
ChannelBuffer encoded = copiedBuffer(str, "UTF-8");
int length = encoded.readableBytes();
- if (length >= 65536) {
- throw new IllegalArgumentException(
- "the specified string is too long (" + length + ")");
+ if (length >= 65536)
+ {
+ throw new IllegalArgumentException("the specified string is too long (" + length + ")");
}
flip();
- buffer.writeShort((short) length);
+ buffer.writeShort((short)length);
buffer.writeBytes(encoded);
buffer.readerIndex(buffer.writerIndex());
}
@@ -415,10 +440,13 @@
public String getUTF() throws Exception
{
ChannelBuffer utf8value;
- try {
+ try
+ {
int length = buffer.readUnsignedShort();
utf8value = buffer.readSlice(length);
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
throw new BufferUnderflowException();
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -53,15 +53,13 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ChannelPipeline pipeline,
- final BufferHandler handler)
+ public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
pipeline.addLast("decoder", new MessagingFrameDecoder(handler));
}
- public static void addSSLFilter(
- final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
+ public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
{
SSLEngine engine = context.createSSLEngine();
engine.setUseClientMode(client);
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,36 +21,31 @@
*/
package org.jboss.messaging.integration.transports.netty;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultMessageEvent;
import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.DefaultMessageEvent;
-import static org.jboss.netty.channel.Channels.write;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
/**
* takes care of making sure that every request has a response and also that any uninitiated responses always wait for a response.
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -94,25 +89,29 @@
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
- HttpRequest request = (HttpRequest) e.getMessage();
+ HttpRequest request = (HttpRequest)e.getMessage();
HttpMethod method = request.getMethod();
- //if we are a post then we send upstream, otherwise we are just being prompted for a response.
+ // if we are a post then we send upstream, otherwise we are just being prompted for a response.
if (method.equals(HttpMethod.POST))
{
- MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+ MessageEvent event = new DefaultMessageEvent(e.getChannel(),
+ e.getFuture(),
+ request.getContent(),
+ e.getRemoteAddress());
ctx.sendUpstream(event);
}
- //add a new response
- responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
+ // add a new response
+ responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime,
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
}
@Override
public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
- //we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
+ // we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
if (e.getMessage() instanceof ChannelBuffer)
{
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ChannelBuffer buf = (ChannelBuffer)e.getMessage();
executor.execute(new ResponseRunner(buf));
}
else
@@ -123,7 +122,7 @@
public void keepAlive(final long time)
{
- //send some responses to catch up thus avoiding any timeout.
+ // send some responses to catch up thus avoiding any timeout.
int lateResponses = 0;
for (ResponseHolder response : responses)
{
@@ -150,6 +149,7 @@
private final ChannelBuffer buffer;
private final boolean bogusResponse;
+
public ResponseRunner(final ChannelBuffer buffer)
{
this.buffer = buffer;
@@ -161,7 +161,7 @@
bogusResponse = true;
buffer = ChannelBuffers.buffer(0);
}
-
+
public void run()
{
ResponseHolder responseHolder = null;
@@ -173,11 +173,11 @@
}
catch (InterruptedException e)
{
- //ignore, we'll just try again
+ // ignore, we'll just try again
}
}
while (responseHolder == null);
- if(!bogusResponse)
+ if (!bogusResponse)
{
piggyBackResponses();
}
@@ -188,15 +188,15 @@
private void piggyBackResponses()
{
- //if we are the last available response then we have to piggy back any remaining responses
- if(responses.isEmpty())
+ // if we are the last available response then we have to piggy back any remaining responses
+ if (responses.isEmpty())
{
do
{
try
{
- ResponseRunner responseRunner = (ResponseRunner) delayedResponses.poll(0, TimeUnit.MILLISECONDS);
- if(responseRunner == null)
+ ResponseRunner responseRunner = (ResponseRunner)delayedResponses.poll(0, TimeUnit.MILLISECONDS);
+ if (responseRunner == null)
{
break;
}
@@ -218,6 +218,7 @@
private class ResponseHolder
{
final HttpResponse response;
+
final long timeReceived;
public ResponseHolder(long timeReceived, HttpResponse response)
@@ -226,5 +227,5 @@
this.response = response;
}
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,10 +21,9 @@
*/
package org.jboss.messaging.integration.transports.netty;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
+import java.util.TimerTask;
/**
* A simple Timer Task to allow HttpAcceptorHandlers to be called intermittently.
@@ -34,15 +33,8 @@
{
private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
- private boolean cancelled;
-
public synchronized void run()
{
- if(cancelled)
- {
- return;
- }
-
long time = System.currentTimeMillis();
for (HttpAcceptorHandler handler : handlers)
{
@@ -62,7 +54,6 @@
public synchronized boolean cancel()
{
- cancelled = true;
return super.cancel();
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -44,7 +44,9 @@
private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
private final BufferHandler handler;
+
private final ConnectionLifeCycleListener listener;
+
volatile boolean active;
MessagingChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
@@ -56,7 +58,7 @@
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
- ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
}
@@ -82,9 +84,7 @@
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
{
- log.error(
- "caught exception " + e.getCause() + " for channel " +
- e.getChannel(), e.getCause());
+ log.error("caught exception " + e.getCause() + " for channel " + e.getChannel(), e.getCause());
synchronized (this)
{
@@ -95,10 +95,13 @@
MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
me.initCause(e.getCause());
- try {
+ try
+ {
listener.connectionException(e.getChannel().getId(), me);
active = false;
- } catch (Exception ex) {
+ }
+ catch (Exception ex)
+ {
log.error("failed to notify the listener:", ex);
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -44,7 +44,6 @@
{
private static final Logger log = Logger.getLogger(MessagingFrameDecoder.class);
-
private final BufferHandler handler;
public MessagingFrameDecoder(final BufferHandler handler)
@@ -58,7 +57,7 @@
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
{
- //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+ // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
int start = in.readerIndex();
int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -114,22 +114,29 @@
private final HttpKeepAliveTask httpKeepAliveTask;
- public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
+ public NettyAcceptor(final Map<String, Object> configuration,
+ final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
this.handler = handler;
this.listener = listener;
- this.sslEnabled =
- ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
- this.httpEnabled =
- ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+ this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
+ TransportConstants.DEFAULT_SSL_ENABLED,
+ configuration);
+ this.httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME,
+ TransportConstants.DEFAULT_HTTP_ENABLED,
+ configuration);
- if(httpEnabled)
+ if (httpEnabled)
{
- httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration);
- httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration);
+ httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME,
+ TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD,
+ configuration);
+ httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME,
+ TransportConstants.DEFAULT_HTTP_RESPONSE_TIME,
+ configuration);
httpKeepAliveTimer = new Timer();
httpKeepAliveTask = new HttpKeepAliveTask();
httpKeepAliveTimer.schedule(httpKeepAliveTask, httpServerScanPeriod, httpServerScanPeriod);
@@ -141,22 +148,29 @@
httpKeepAliveTimer = null;
httpKeepAliveTask = null;
}
- this.useNio =
- ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
- this.host =
- ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
- this.port =
- ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
+ this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
+ TransportConstants.DEFAULT_USE_NIO,
+ configuration);
+ this.host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
+ TransportConstants.DEFAULT_HOST,
+ configuration);
+ this.port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME,
+ TransportConstants.DEFAULT_PORT,
+ configuration);
if (sslEnabled)
{
- this.keyStorePath =
- ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
- this.keyStorePassword =
- ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
- this.trustStorePath =
- ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
- this.trustStorePassword =
- ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
+ this.keyStorePath = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME,
+ TransportConstants.DEFAULT_KEYSTORE_PATH,
+ configuration);
+ this.keyStorePassword = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME,
+ TransportConstants.DEFAULT_KEYSTORE_PASSWORD,
+ configuration);
+ this.trustStorePath = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME,
+ TransportConstants.DEFAULT_TRUSTSTORE_PATH,
+ configuration);
+ this.trustStorePassword = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME,
+ TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD,
+ configuration);
}
else
{
@@ -166,12 +180,15 @@
this.trustStorePassword = null;
}
- this.tcpNoDelay =
- ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
- this.tcpSendBufferSize =
- ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
- this.tcpReceiveBufferSize =
- ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
+ this.tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME,
+ TransportConstants.DEFAULT_TCP_NODELAY,
+ configuration);
+ this.tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME,
+ TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE,
+ configuration);
+ this.tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
+ TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
+ configuration);
}
@@ -179,7 +196,7 @@
{
if (channelFactory != null)
{
- //Already started
+ // Already started
return;
}
bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
@@ -199,17 +216,13 @@
{
try
{
- context = SSLSupport.createServerContext(
- keyStorePath,
- keyStorePassword,
- trustStorePath,
- trustStorePassword);
+ context = SSLSupport.createServerContext(keyStorePath, keyStorePassword, trustStorePath, trustStorePassword);
}
catch (Exception e)
{
- IllegalStateException ise = new IllegalStateException(
- "Unable to create NettyAcceptor for " +
- host + ":" + port);
+ IllegalStateException ise = new IllegalStateException("Unable to create NettyAcceptor for " + host +
+ ":" +
+ port);
ise.initCause(e);
throw ise;
}
@@ -266,16 +279,16 @@
return;
}
- if(httpKeepAliveTimer != null)
+ if (httpKeepAliveTimer != null)
{
httpKeepAliveTask.cancel();
-
+
httpKeepAliveTimer.cancel();
}
serverChannel.close().awaitUninterruptibly();
bossExecutor.shutdown();
workerExecutor.shutdown();
- for (; ;)
+ for (;;)
{
try
{
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -65,7 +65,7 @@
return;
}
- SslHandler sslHandler = (SslHandler) channel.getPipeline().get("ssl");
+ SslHandler sslHandler = (SslHandler)channel.getPipeline().get("ssl");
if (sslHandler != null)
{
try
@@ -76,23 +76,25 @@
{
// ignore
}
- } else {
+ }
+ else
+ {
channel.close();
}
-// This block has been disabled because this method can be called from
-// the Netty I/O thread.
-// TODO Netty should be improved to provide a way to determine
-// if the current code is running in the I/O thread.
-//
-// if (channel.getParent() == null) {
-// // A client channel - wait until everything is cleaned up.
-// // TODO Do not spin - use signal.
-// MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
-// while (handler.active) {
-// Thread.yield();
-// }
-// }
+ // This block has been disabled because this method can be called from
+ // the Netty I/O thread.
+ // TODO Netty should be improved to provide a way to determine
+ // if the current code is running in the I/O thread.
+ //
+ // if (channel.getParent() == null) {
+ // // A client channel - wait until everything is cleaned up.
+ // // TODO Do not spin - use signal.
+ // MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
+ // while (handler.active) {
+ // Thread.yield();
+ // }
+ // }
closed = true;
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,6 +21,20 @@
*/
package org.jboss.messaging.integration.transports.netty;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -38,12 +52,10 @@
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DefaultMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
@@ -56,16 +68,6 @@
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
/**
* A NettyConnector
*
@@ -143,15 +145,18 @@
this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_SSL_ENABLED,
configuration);
- this.httpEnabled =
- ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+ this.httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME,
+ TransportConstants.DEFAULT_HTTP_ENABLED,
+ configuration);
- if(httpEnabled)
+ if (httpEnabled)
{
this.httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME,
- TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
+ TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME,
+ configuration);
this.httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD,
- TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
+ TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD,
+ configuration);
}
else
{
@@ -374,7 +379,9 @@
class HttpHandler extends SimpleChannelHandler
{
private Channel channel;
+
private long lastSendTime = 0;
+
private boolean waitingGet = false;
private Timer idleClientTimer;
@@ -390,7 +397,6 @@
}
}
-
public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
if (idleClientTimer != null)
@@ -403,8 +409,11 @@
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
- HttpResponse response = (HttpResponse) e.getMessage();
- MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+ HttpResponse response = (HttpResponse)e.getMessage();
+ MessageEvent event = new DefaultMessageEvent(e.getChannel(),
+ e.getFuture(),
+ response.getContent(),
+ e.getRemoteAddress());
waitingGet = false;
ctx.sendUpstream(event);
}
@@ -415,7 +424,7 @@
if (e.getMessage() instanceof ChannelBuffer)
{
HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ChannelBuffer buf = (ChannelBuffer)e.getMessage();
httpRequest.setContent(buf);
httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
@@ -431,9 +440,10 @@
private class HttpIdleTimerTask extends TimerTask
{
long currentTime = System.currentTimeMillis();
+
public void run()
{
- if(!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
+ if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
{
HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/jbm/");
waitingGet = true;
@@ -443,5 +453,4 @@
}
}
-
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-10 10:49:10 UTC (rev 5499)
@@ -30,7 +30,7 @@
public class TransportConstants
{
public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
-
+
public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
public static final String HTTP_CLIENT_IDLE_PROP_NAME = "jbm.remoting.netty.httpclientidletime";
@@ -40,47 +40,47 @@
public static final String HTTP_RESPONSE_TIME_PROP_NAME = "jbm.remoting.netty.httpresponsetime";
public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "jbm.remoting.netty.httpserverscanperiod";
-
+
public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
-
+
public static final String HOST_PROP_NAME = "jbm.remoting.netty.host";
-
+
public static final String PORT_PROP_NAME = "jbm.remoting.netty.port";
-
+
public static final String KEYSTORE_PATH_PROP_NAME = "jbm.remoting.netty.keystorepath";
-
+
public static final String KEYSTORE_PASSWORD_PROP_NAME = "jbm.remoting.netty.keystorepassword";
-
+
public static final String TRUSTSTORE_PATH_PROP_NAME = "jbm.remoting.netty.truststorepath";
-
+
public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "jbm.remoting.netty.truststorepassword";
-
+
public static final String TCP_NODELAY_PROPNAME = "jbm.remoting.netty.tcpnodelay";
-
+
public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "jbm.remoting.netty.tcpsendbuffersize";
-
+
public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "jbm.remoting.netty.tcpreceivebuffersize";
-
+
public static final boolean DEFAULT_SSL_ENABLED = false;
-
+
public static final boolean DEFAULT_USE_NIO = true;
-
+
public static final String DEFAULT_HOST = "localhost";
-
+
public static final int DEFAULT_PORT = 5400;
-
+
public static final String DEFAULT_KEYSTORE_PATH = "messaging.keystore";
-
- public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
-
+
+ public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
+
public static final String DEFAULT_TRUSTSTORE_PATH = "messaging.truststore";
-
+
public static final String DEFAULT_TRUSTSTORE_PASSWORD = "secureexample";
-
+
public static final boolean DEFAULT_TCP_NODELAY = true;
-
+
public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
-
+
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
public static final boolean DEFAULT_HTTP_ENABLED = false;
More information about the jboss-cvs-commits
mailing list