[jboss-cvs] JBoss Messaging SVN: r5499 - trunk/src/main/org/jboss/messaging/integration/transports/netty.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 10 05:49:10 EST 2008


Author: timfox
Date: 2008-12-10 05:49:10 -0500 (Wed, 10 Dec 2008)
New Revision: 5499

Modified:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
Log:
Some cosmetics and tweaks


Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -192,59 +192,77 @@
 
    public void putChar(final char chr)
    {
-      putShort((short) chr);
+      putShort((short)chr);
    }
 
    public byte getByte()
    {
-      try {
+      try
+      {
          return buffer.readByte();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public short getUnsignedByte()
    {
-      try {
+      try
+      {
          return buffer.readUnsignedByte();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public void getBytes(final byte[] b)
    {
-      try {
+      try
+      {
          buffer.readBytes(b);
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public void getBytes(final byte[] b, final int offset, final int length)
    {
-      try {
+      try
+      {
          buffer.readBytes(b, offset, length);
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public int getInt()
    {
-      try {
+      try
+      {
          return buffer.readInt();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public long getLong()
    {
-      try {
+      try
+      {
          return buffer.readLong();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
@@ -256,18 +274,24 @@
 
    public short getShort()
    {
-      try {
+      try
+      {
          return buffer.readShort();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
 
    public int getUnsignedShort()
    {
-      try {
+      try
+      {
          return buffer.readUnsignedShort();
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
    }
@@ -279,7 +303,7 @@
 
    public char getChar()
    {
-      return (char) getShort();
+      return (char)getShort();
    }
 
    public void putBoolean(final boolean b)
@@ -287,7 +311,8 @@
       if (b)
       {
          putByte(TRUE);
-      } else
+      }
+      else
       {
          putByte(FALSE);
       }
@@ -305,7 +330,7 @@
       buffer.writeInt(nullableString.length());
       for (int i = 0; i < nullableString.length(); i++)
       {
-         buffer.writeShort((short) nullableString.charAt(i));
+         buffer.writeShort((short)nullableString.charAt(i));
       }
       buffer.readerIndex(buffer.writerIndex());
    }
@@ -355,13 +380,13 @@
    {
       ChannelBuffer encoded = copiedBuffer(str, "UTF-8");
       int length = encoded.readableBytes();
-      if (length >= 65536) {
-         throw new IllegalArgumentException(
-               "the specified string is too long (" + length + ")");
+      if (length >= 65536)
+      {
+         throw new IllegalArgumentException("the specified string is too long (" + length + ")");
       }
 
       flip();
-      buffer.writeShort((short) length);
+      buffer.writeShort((short)length);
       buffer.writeBytes(encoded);
       buffer.readerIndex(buffer.writerIndex());
    }
@@ -415,10 +440,13 @@
    public String getUTF() throws Exception
    {
       ChannelBuffer utf8value;
-      try {
+      try
+      {
          int length = buffer.readUnsignedShort();
          utf8value = buffer.readSlice(length);
-      } catch (IndexOutOfBoundsException e) {
+      }
+      catch (IndexOutOfBoundsException e)
+      {
          throw new BufferUnderflowException();
       }
 

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/ChannelPipelineSupport.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -53,15 +53,13 @@
 
    // Public --------------------------------------------------------
 
-   public static void addCodecFilter(final ChannelPipeline pipeline,
-                                     final BufferHandler handler)
+   public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
       pipeline.addLast("decoder", new MessagingFrameDecoder(handler));
    }
 
-   public static void addSSLFilter(
-         final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
+   public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
    {
       SSLEngine engine = context.createSSLEngine();
       engine.setUseClientMode(client);

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpAcceptorHandler.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,36 +21,31 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultMessageEvent;
 import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.DefaultMessageEvent;
-import static org.jboss.netty.channel.Channels.write;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
 /**
  * takes care of making sure that every request has a response and also that any uninitiated responses always wait for a response.
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -94,25 +89,29 @@
    @Override
    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
    {
-      HttpRequest request = (HttpRequest) e.getMessage();
+      HttpRequest request = (HttpRequest)e.getMessage();
       HttpMethod method = request.getMethod();
-      //if we are a post then we send upstream, otherwise we are just being prompted for a response.
+      // if we are a post then we send upstream, otherwise we are just being prompted for a response.
       if (method.equals(HttpMethod.POST))
       {
-         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+         MessageEvent event = new DefaultMessageEvent(e.getChannel(),
+                                                      e.getFuture(),
+                                                      request.getContent(),
+                                                      e.getRemoteAddress());
          ctx.sendUpstream(event);
       }
-      //add a new response
-      responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
+      // add a new response
+      responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime,
+                                       new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
    }
 
    @Override
    public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
    {
-      //we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
+      // we are either a channel buffer, which gets delayed until a response is available, or we are the actual response
       if (e.getMessage() instanceof ChannelBuffer)
       {
-         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+         ChannelBuffer buf = (ChannelBuffer)e.getMessage();
          executor.execute(new ResponseRunner(buf));
       }
       else
@@ -123,7 +122,7 @@
 
    public void keepAlive(final long time)
    {
-      //send some responses to catch up thus avoiding any timeout.
+      // send some responses to catch up thus avoiding any timeout.
       int lateResponses = 0;
       for (ResponseHolder response : responses)
       {
@@ -150,6 +149,7 @@
       private final ChannelBuffer buffer;
 
       private final boolean bogusResponse;
+
       public ResponseRunner(final ChannelBuffer buffer)
       {
          this.buffer = buffer;
@@ -161,7 +161,7 @@
          bogusResponse = true;
          buffer = ChannelBuffers.buffer(0);
       }
-      
+
       public void run()
       {
          ResponseHolder responseHolder = null;
@@ -173,11 +173,11 @@
             }
             catch (InterruptedException e)
             {
-               //ignore, we'll just try again
+               // ignore, we'll just try again
             }
          }
          while (responseHolder == null);
-         if(!bogusResponse)
+         if (!bogusResponse)
          {
             piggyBackResponses();
          }
@@ -188,15 +188,15 @@
 
       private void piggyBackResponses()
       {
-         //if we are the last available response then we have to piggy back any remaining responses
-         if(responses.isEmpty())
+         // if we are the last available response then we have to piggy back any remaining responses
+         if (responses.isEmpty())
          {
             do
             {
                try
                {
-                  ResponseRunner responseRunner = (ResponseRunner) delayedResponses.poll(0, TimeUnit.MILLISECONDS);
-                  if(responseRunner == null)
+                  ResponseRunner responseRunner = (ResponseRunner)delayedResponses.poll(0, TimeUnit.MILLISECONDS);
+                  if (responseRunner == null)
                   {
                      break;
                   }
@@ -218,6 +218,7 @@
    private class ResponseHolder
    {
       final HttpResponse response;
+
       final long timeReceived;
 
       public ResponseHolder(long timeReceived, HttpResponse response)
@@ -226,5 +227,5 @@
          this.response = response;
       }
    }
-   
+
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/HttpKeepAliveTask.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,10 +21,9 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.TimerTask;
 
 /**
  * A simple Timer Task to allow HttpAcceptorHandlers to be called intermittently.
@@ -34,15 +33,8 @@
 {
    private final List<HttpAcceptorHandler> handlers = new ArrayList<HttpAcceptorHandler>();
 
-   private boolean cancelled;
-
    public synchronized void run()
    {
-      if(cancelled)
-      {
-         return;
-      }
-      
       long time = System.currentTimeMillis();
       for (HttpAcceptorHandler handler : handlers)
       {
@@ -62,7 +54,6 @@
 
    public synchronized boolean cancel()
    {
-      cancelled = true;
       return super.cancel();
    }
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -44,7 +44,9 @@
    private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
 
    private final BufferHandler handler;
+
    private final ConnectionLifeCycleListener listener;
+
    volatile boolean active;
 
    MessagingChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
@@ -56,7 +58,7 @@
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
-      ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+      ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
       handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
    }
 
@@ -82,9 +84,7 @@
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
    {
-      log.error(
-            "caught exception " + e.getCause() + " for channel " +
-            e.getChannel(), e.getCause());
+      log.error("caught exception " + e.getCause() + " for channel " + e.getChannel(), e.getCause());
 
       synchronized (this)
       {
@@ -95,10 +95,13 @@
 
          MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
          me.initCause(e.getCause());
-         try {
+         try
+         {
             listener.connectionException(e.getChannel().getId(), me);
             active = false;
-         } catch (Exception ex) {
+         }
+         catch (Exception ex)
+         {
             log.error("failed to notify the listener:", ex);
          }
       }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -44,7 +44,6 @@
 {
    private static final Logger log = Logger.getLogger(MessagingFrameDecoder.class);
 
-   
    private final BufferHandler handler;
 
    public MessagingFrameDecoder(final BufferHandler handler)
@@ -58,7 +57,7 @@
    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
    {
-      //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+      // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
       int start = in.readerIndex();
 
       int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -114,22 +114,29 @@
 
    private final HttpKeepAliveTask httpKeepAliveTask;
 
-   public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
+   public NettyAcceptor(final Map<String, Object> configuration,
+                        final BufferHandler handler,
                         final ConnectionLifeCycleListener listener)
    {
       this.handler = handler;
 
       this.listener = listener;
 
-      this.sslEnabled =
-            ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
-      this.httpEnabled =
-            ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+      this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
+                                                               TransportConstants.DEFAULT_SSL_ENABLED,
+                                                               configuration);
+      this.httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME,
+                                                                TransportConstants.DEFAULT_HTTP_ENABLED,
+                                                                configuration);
 
-      if(httpEnabled)
+      if (httpEnabled)
       {
-         httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration);
-         httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration);
+         httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME,
+                                                                    TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD,
+                                                                    configuration);
+         httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME,
+                                                                TransportConstants.DEFAULT_HTTP_RESPONSE_TIME,
+                                                                configuration);
          httpKeepAliveTimer = new Timer();
          httpKeepAliveTask = new HttpKeepAliveTask();
          httpKeepAliveTimer.schedule(httpKeepAliveTask, httpServerScanPeriod, httpServerScanPeriod);
@@ -141,22 +148,29 @@
          httpKeepAliveTimer = null;
          httpKeepAliveTask = null;
       }
-      this.useNio =
-            ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
-      this.host =
-            ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
-      this.port =
-            ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
+      this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
+                                                           TransportConstants.DEFAULT_USE_NIO,
+                                                           configuration);
+      this.host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
+                                                        TransportConstants.DEFAULT_HOST,
+                                                        configuration);
+      this.port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME,
+                                                     TransportConstants.DEFAULT_PORT,
+                                                     configuration);
       if (sslEnabled)
       {
-         this.keyStorePath =
-               ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
-         this.keyStorePassword =
-               ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
-         this.trustStorePath =
-               ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
-         this.trustStorePassword =
-               ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
+         this.keyStorePath = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME,
+                                                                   TransportConstants.DEFAULT_KEYSTORE_PATH,
+                                                                   configuration);
+         this.keyStorePassword = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME,
+                                                                       TransportConstants.DEFAULT_KEYSTORE_PASSWORD,
+                                                                       configuration);
+         this.trustStorePath = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME,
+                                                                     TransportConstants.DEFAULT_TRUSTSTORE_PATH,
+                                                                     configuration);
+         this.trustStorePassword = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME,
+                                                                         TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD,
+                                                                         configuration);
       }
       else
       {
@@ -166,12 +180,15 @@
          this.trustStorePassword = null;
       }
 
-      this.tcpNoDelay =
-            ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
-      this.tcpSendBufferSize =
-            ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
-      this.tcpReceiveBufferSize =
-            ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
+      this.tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME,
+                                                               TransportConstants.DEFAULT_TCP_NODELAY,
+                                                               configuration);
+      this.tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME,
+                                                                  TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE,
+                                                                  configuration);
+      this.tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
+                                                                     TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
+                                                                     configuration);
 
    }
 
@@ -179,7 +196,7 @@
    {
       if (channelFactory != null)
       {
-         //Already started
+         // Already started
          return;
       }
       bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
@@ -199,17 +216,13 @@
       {
          try
          {
-            context = SSLSupport.createServerContext(
-                  keyStorePath,
-                  keyStorePassword,
-                  trustStorePath,
-                  trustStorePassword);
+            context = SSLSupport.createServerContext(keyStorePath, keyStorePassword, trustStorePath, trustStorePassword);
          }
          catch (Exception e)
          {
-            IllegalStateException ise = new IllegalStateException(
-                  "Unable to create NettyAcceptor for " +
-                  host + ":" + port);
+            IllegalStateException ise = new IllegalStateException("Unable to create NettyAcceptor for " + host +
+                                                                  ":" +
+                                                                  port);
             ise.initCause(e);
             throw ise;
          }
@@ -266,16 +279,16 @@
          return;
       }
 
-      if(httpKeepAliveTimer != null)
+      if (httpKeepAliveTimer != null)
       {
          httpKeepAliveTask.cancel();
-         
+
          httpKeepAliveTimer.cancel();
       }
       serverChannel.close().awaitUninterruptibly();
       bossExecutor.shutdown();
       workerExecutor.shutdown();
-      for (; ;)
+      for (;;)
       {
          try
          {

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -65,7 +65,7 @@
          return;
       }
 
-      SslHandler sslHandler = (SslHandler) channel.getPipeline().get("ssl");
+      SslHandler sslHandler = (SslHandler)channel.getPipeline().get("ssl");
       if (sslHandler != null)
       {
          try
@@ -76,23 +76,25 @@
          {
             // ignore
          }
-      } else {
+      }
+      else
+      {
          channel.close();
       }
 
-//      This block has been disabled because this method can be called from
-//      the Netty I/O thread.
-//      TODO Netty should be improved to provide a way to determine
-//           if the current code is running in the I/O thread.
-//
-//      if (channel.getParent() == null) {
-//         // A client channel - wait until everything is cleaned up.
-//         // TODO Do not spin - use signal.
-//         MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
-//         while (handler.active) {
-//            Thread.yield();
-//         }
-//      }
+      // This block has been disabled because this method can be called from
+      // the Netty I/O thread.
+      // TODO Netty should be improved to provide a way to determine
+      // if the current code is running in the I/O thread.
+      //
+      // if (channel.getParent() == null) {
+      // // A client channel - wait until everything is cleaned up.
+      // // TODO Do not spin - use signal.
+      // MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
+      // while (handler.active) {
+      // Thread.yield();
+      // }
+      // }
 
       closed = true;
    }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -21,6 +21,20 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -38,12 +52,10 @@
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.DefaultMessageEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
@@ -56,16 +68,6 @@
 import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A NettyConnector
  *
@@ -143,15 +145,18 @@
       this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
                                                                TransportConstants.DEFAULT_SSL_ENABLED,
                                                                configuration);
-      this.httpEnabled =
-            ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+      this.httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME,
+                                                                TransportConstants.DEFAULT_HTTP_ENABLED,
+                                                                configuration);
 
-      if(httpEnabled)
+      if (httpEnabled)
       {
          this.httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME,
-                                                                      TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
+                                                                          TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME,
+                                                                          configuration);
          this.httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD,
-                                                                      TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
+                                                                             TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD,
+                                                                             configuration);
       }
       else
       {
@@ -374,7 +379,9 @@
    class HttpHandler extends SimpleChannelHandler
    {
       private Channel channel;
+
       private long lastSendTime = 0;
+
       private boolean waitingGet = false;
 
       private Timer idleClientTimer;
@@ -390,7 +397,6 @@
          }
       }
 
-
       public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
          if (idleClientTimer != null)
@@ -403,8 +409,11 @@
       @Override
       public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
       {
-         HttpResponse response = (HttpResponse) e.getMessage();
-         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+         HttpResponse response = (HttpResponse)e.getMessage();
+         MessageEvent event = new DefaultMessageEvent(e.getChannel(),
+                                                      e.getFuture(),
+                                                      response.getContent(),
+                                                      e.getRemoteAddress());
          waitingGet = false;
          ctx.sendUpstream(event);
       }
@@ -415,7 +424,7 @@
          if (e.getMessage() instanceof ChannelBuffer)
          {
             HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
-            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            ChannelBuffer buf = (ChannelBuffer)e.getMessage();
             httpRequest.setContent(buf);
             httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
             write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
@@ -431,9 +440,10 @@
       private class HttpIdleTimerTask extends TimerTask
       {
          long currentTime = System.currentTimeMillis();
+
          public void run()
          {
-            if(!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
+            if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
             {
                HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/jbm/");
                waitingGet = true;
@@ -443,5 +453,4 @@
       }
    }
 
-
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-10 10:35:21 UTC (rev 5498)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-10 10:49:10 UTC (rev 5499)
@@ -30,7 +30,7 @@
 public class TransportConstants
 {
    public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
-   
+
    public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
 
    public static final String HTTP_CLIENT_IDLE_PROP_NAME = "jbm.remoting.netty.httpclientidletime";
@@ -40,47 +40,47 @@
    public static final String HTTP_RESPONSE_TIME_PROP_NAME = "jbm.remoting.netty.httpresponsetime";
 
    public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "jbm.remoting.netty.httpserverscanperiod";
-   
+
    public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
-   
+
    public static final String HOST_PROP_NAME = "jbm.remoting.netty.host";
-   
+
    public static final String PORT_PROP_NAME = "jbm.remoting.netty.port";
-   
+
    public static final String KEYSTORE_PATH_PROP_NAME = "jbm.remoting.netty.keystorepath";
-   
+
    public static final String KEYSTORE_PASSWORD_PROP_NAME = "jbm.remoting.netty.keystorepassword";
-   
+
    public static final String TRUSTSTORE_PATH_PROP_NAME = "jbm.remoting.netty.truststorepath";
-   
+
    public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "jbm.remoting.netty.truststorepassword";
-   
+
    public static final String TCP_NODELAY_PROPNAME = "jbm.remoting.netty.tcpnodelay";
-   
+
    public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "jbm.remoting.netty.tcpsendbuffersize";
-   
+
    public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "jbm.remoting.netty.tcpreceivebuffersize";
-   
+
    public static final boolean DEFAULT_SSL_ENABLED = false;
-   
+
    public static final boolean DEFAULT_USE_NIO = true;
-   
+
    public static final String DEFAULT_HOST = "localhost";
-   
+
    public static final int DEFAULT_PORT = 5400;
-   
+
    public static final String DEFAULT_KEYSTORE_PATH = "messaging.keystore";
- 
-   public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";    
- 
+
+   public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
+
    public static final String DEFAULT_TRUSTSTORE_PATH = "messaging.truststore";
- 
+
    public static final String DEFAULT_TRUSTSTORE_PASSWORD = "secureexample";
-   
+
    public static final boolean DEFAULT_TCP_NODELAY = true;
-   
+
    public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
-   
+
    public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
 
    public static final boolean DEFAULT_HTTP_ENABLED = false;




More information about the jboss-cvs-commits mailing list