[hornetq-commits] JBoss hornetq SVN: r8837 - in trunk: src/main/org/hornetq/core/protocol/aardvark/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 22 06:06:33 EST 2010


Author: timfox
Date: 2010-01-22 06:06:32 -0500 (Fri, 22 Jan 2010)
New Revision: 8837

Added:
   trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
Removed:
   trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
   trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
   trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
   trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
   trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
removed the buffer decoding abstraction from the handler

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -39,7 +39,6 @@
 import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -48,6 +47,7 @@
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.Connector;
@@ -1085,7 +1085,7 @@
       }
    }
 
-   private class DelegatingBufferHandler extends AbstractBufferHandler
+   private class DelegatingBufferHandler implements BufferHandler
    {
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {

Modified: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -130,10 +130,4 @@
    {
       manager.handleBuffer(this, buffer);
    }
-
-   public int isReadyToHandle(HornetQBuffer buffer)
-   {
-      return -1;
-   }
-
 }

Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.core.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
-   private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
-   public int isReadyToHandle(final HornetQBuffer buffer)
-   {
-      if (buffer.readableBytes() < DataConstants.SIZE_INT)
-      {
-         return -1;
-      }
-
-      int length = buffer.readInt();
-
-      if (buffer.readableBytes() < length)
-      {
-         return -1;
-      }
-
-      return length;
-   }
-}

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -124,6 +124,8 @@
    {
    }
 
+   //This is never called using the core protocol, since we override the HornetQFrameDecoder with our core
+   //optimised version HornetQFrameDecoder2, which nevers calls this
    public int isReadyToHandle(HornetQBuffer buffer)
    {
       return -1;

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -31,6 +31,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
 
@@ -39,7 +40,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt> $Id$
  */
-public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
+public class RemotingConnectionImpl implements BufferHandler, CoreRemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -46,7 +46,7 @@
    private final int id;
 
    private final BufferHandler handler;
-
+   
    private final ConnectionLifeCycleListener listener;
 
    private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@@ -62,13 +62,13 @@
    private final ProtocolType protocol;
 
    public InVMAcceptor(final Map<String, Object> configuration,
-                       final BufferHandler handler,
+                       final BufferHandler handler,                       
                        final ConnectionLifeCycleListener listener,
                        final Executor threadPool,
                        final ProtocolType protocol)
    {
       this.handler = handler;
-
+      
       this.listener = listener;
 
       id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -20,6 +20,7 @@
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 
@@ -33,6 +34,7 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool,

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -70,8 +70,6 @@
 
    private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
 
-   //private final BufferHandler bufferHandler = new DelegatingBufferHandler();
-
    private final Configuration config;
 
    private final HornetQServer server;
@@ -169,6 +167,7 @@
             
             Acceptor acceptor = factory.createAcceptor(info.getParams(),
                                                        new DelegatingBufferHandler(manager),
+                                                       manager,
                                                        this,
                                                        threadPool,
                                                        scheduledThreadPool,
@@ -405,11 +404,6 @@
             conn.connection.bufferReceived(connectionID, buffer);
          }
       }
-
-      public int isReadyToHandle(HornetQBuffer buffer)
-      {
-         return manager.isReadyToHandle(buffer);
-      }
    }
 
    private final class FailureCheckAndFlushThread extends Thread

Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -17,7 +17,7 @@
 import javax.net.ssl.SSLEngine;
 
 import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
 
@@ -45,18 +45,19 @@
 
    // Public --------------------------------------------------------
 
-   public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferHandler handler)
+   public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferDecoder decoder)
    {
       assert pipeline != null;
       
       if (protocol == ProtocolType.CORE)
       {
+         //Core protocol uses it's own optimised decoder
          pipeline.addLast("decoder", new HornetQFrameDecoder2());
       }
       else
       {
          //Use the old frame decoder for other protocols
-         pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+         pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
       }
    }
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -15,8 +15,7 @@
 
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -36,11 +35,11 @@
 {
    private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
 
-   private final BufferHandler handler;
+   private final BufferDecoder decoder;
 
-   public HornetQFrameDecoder(final BufferHandler handler)
+   public HornetQFrameDecoder(final BufferDecoder decoder)
    {
-      this.handler = handler;
+      this.decoder = decoder;
    }
 
    // FrameDecoder overrides
@@ -48,34 +47,24 @@
 
    @Override
    protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final ChannelBuffer in) throws Exception
-   {
-      log.info("dewcoding!!");
-           
+   {    
       int start = in.readerIndex();
 
-      int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+      int length = decoder.isReadyToHandle(new ChannelBufferWrapper(in));
       
-      log.info("length is " + length);
-
       in.readerIndex(start);
       
-      log.info("length is 2 " + length);
-
       if (length == -1)
       {
          return null;
       }
       
-      log.info("creating buffer");
-
       ChannelBuffer buffer = in.readBytes(length);
 
       ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
 
       newBuffer.writeBytes(buffer);
       
-      log.info("got the buffer");
-
       return newBuffer;
    }
 }

Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -24,7 +24,7 @@
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
- * A Netty decoder used to decode messages.
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
  *
  * @author <a href="tlee at redhat.com">Trustin Lee</a>
  *

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -36,6 +36,7 @@
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -88,6 +89,8 @@
    private ServerBootstrap bootstrap;
 
    private final BufferHandler handler;
+   
+   private final BufferDecoder decoder;
 
    private final ConnectionLifeCycleListener listener;
 
@@ -137,12 +140,15 @@
 
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
+                        final BufferDecoder decoder,
                         final ConnectionLifeCycleListener listener,
                         final Executor threadPool,
                         final ScheduledExecutorService scheduledThreadPool,
                         final ProtocolType protocol)
    {
       this.handler = handler;
+      
+      this.decoder = decoder;
 
       this.listener = listener;
       
@@ -288,7 +294,7 @@
                pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
             }
 
-            ChannelPipelineSupport.addCodecFilter(protocol, pipeline, handler);
+            ChannelPipelineSupport.addCodecFilter(protocol, pipeline, decoder);
             pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -21,6 +21,7 @@
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 
@@ -33,12 +34,13 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool,
                                   final ProtocolType protocol)
    {
-      return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool,
+      return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool,
                                protocol);
    }
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -311,7 +311,7 @@
                pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
                pipeline.addLast("httphandler", new HttpHandler());
             }
-            ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, handler);
+            ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, null);
             pipeline.addLast("handler", new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }

Modified: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -13,6 +13,7 @@
 
 package org.hornetq.spi.core.protocol;
 
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 
@@ -23,7 +24,7 @@
  *
  *
  */
-public interface ProtocolManager extends BufferHandler
+public interface ProtocolManager extends BufferHandler, BufferDecoder
 {
    ConnectionEntry createConnectionEntry(Connection connection);
 }

Modified: trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -36,6 +36,7 @@
     *
     * @param configuration       the configuration
     * @param handler             the handler
+    * @param decoder             the decoder
     * @param listener            the listener
     * @param threadPool          the threadpool
     * @param scheduledThreadPool a scheduled thread pool
@@ -43,6 +44,7 @@
     */
    Acceptor createAcceptor(final Map<String, Object> configuration,
                            BufferHandler handler,
+                           BufferDecoder decoder,
                            ConnectionLifeCycleListener listener,
                            Executor threadPool,
                            ScheduledExecutorService scheduledThreadPool,

Added: trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+   /**
+    * called by the remoting system prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
+    * <p/>
+    * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+    *
+    * @param buffer the buffer
+    * @return true id the buffer can be decoded..
+    */
+   int isReadyToHandle(HornetQBuffer buffer);
+}

Modified: trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -29,15 +29,5 @@
     * @param connectionID the connection the buffer was received on
     * @param buffer       the buffer to decode
     */
-   void bufferReceived(Object connectionID, HornetQBuffer buffer);
-
-   /**
-    * called by the remoting connection prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
-    * <p/>
-    * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
-    *
-    * @param buffer the buffer
-    * @return true id the buffer can be decoded..
-    */
-   int isReadyToHandle(HornetQBuffer buffer);
+   void bufferReceived(Object connectionID, HornetQBuffer buffer);   
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -25,7 +25,6 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyConnector;
 import org.hornetq.integration.transports.netty.TransportConstants;
@@ -99,7 +98,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -153,7 +152,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -211,7 +210,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -270,7 +269,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -328,7 +327,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -382,7 +381,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -429,7 +428,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -462,7 +461,7 @@
       }
    }
 
-   class SimpleBufferHandler extends AbstractBufferHandler
+   class SimpleBufferHandler implements BufferHandler
    {
       int messagesReceieved = 0;
 
@@ -484,7 +483,7 @@
       }
    }
 
-   class SimpleBufferHandler2 extends AbstractBufferHandler
+   class SimpleBufferHandler2 implements BufferHandler
    {
       int messagesReceieved = 0;
 
@@ -525,11 +524,6 @@
          this.latch = latch;
       }
 
-      public int isReadyToHandle(final HornetQBuffer buffer)
-      {
-         return 0;
-      }
-
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
          int i = buffer.readInt();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -21,7 +21,6 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -45,7 +44,7 @@
       NettyAcceptorFactory factory = new NettyAcceptorFactory();
 
       Map<String, Object> params = new HashMap<String, Object>();
-      BufferHandler handler = new AbstractBufferHandler()
+      BufferHandler handler = new BufferHandler()
       {
 
          public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
@@ -71,6 +70,7 @@
 
       Acceptor acceptor = factory.createAcceptor(params,
                                                  handler,
+                                                 null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -22,7 +22,6 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -58,7 +57,7 @@
 
    public void testStartStop() throws Exception
    {
-      BufferHandler handler = new AbstractBufferHandler()
+      BufferHandler handler = new BufferHandler()
       {
 
          public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
@@ -84,6 +83,7 @@
       };
       NettyAcceptor acceptor = new NettyAcceptor(params,
                                                  handler,
+                                                 null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-22 10:29:26 UTC (rev 8836)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-22 11:06:32 UTC (rev 8837)
@@ -21,7 +21,6 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyConnector;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -50,7 +49,7 @@
 
    public void testStartStop() throws Exception
    {
-      BufferHandler handler = new AbstractBufferHandler()
+      BufferHandler handler = new BufferHandler()
       {
          public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
          {
@@ -73,7 +72,7 @@
       };
 
       NettyConnector connector = new NettyConnector(params,
-                                                    handler,
+                                                    handler,                                                    
                                                     listener,
                                                     Executors.newCachedThreadPool(),
                                                     Executors.newCachedThreadPool(),
@@ -87,7 +86,7 @@
 
    public void testNullParams() throws Exception
    {
-      BufferHandler handler = new AbstractBufferHandler()
+      BufferHandler handler = new BufferHandler()
       {
          public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
          {



More information about the hornetq-commits mailing list