[hornetq-commits] JBoss hornetq SVN: r8841 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/protocol/core/impl and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 22 08:21:28 EST 2010


Author: jmesnil
Date: 2010-01-22 08:21:27 -0500 (Fri, 22 Jan 2010)
New Revision: 8841

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* sync with the trunk: svn merge -r 8832:8839 https://svn.jboss.org/repos/hornetq/trunk

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -130,10 +130,4 @@
    {
       manager.handleBuffer(this, buffer);
    }
-
-   public int isReadyToHandle(HornetQBuffer buffer)
-   {
-      return -1;
-   }
-
 }

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.core.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
-   private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
-   public int isReadyToHandle(final HornetQBuffer buffer)
-   {
-      if (buffer.readableBytes() < DataConstants.SIZE_INT)
-      {
-         return -1;
-      }
-
-      int length = buffer.readInt();
-
-      if (buffer.readableBytes() < length)
-      {
-         return -1;
-      }
-
-      return length;
-   }
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -125,6 +125,8 @@
    {
    }
 
+   //This is never called using the core protocol, since we override the HornetQFrameDecoder with our core
+   //optimised version HornetQFrameDecoder2, which nevers calls this
    public int isReadyToHandle(HornetQBuffer buffer)
    {
       return -1;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -31,6 +31,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
 
@@ -39,7 +40,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt> $Id$
  */
-public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
+public class RemotingConnectionImpl implements BufferHandler, CoreRemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -292,5 +292,4 @@
    {
       return -1;
    }
-
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -46,7 +46,7 @@
    private final int id;
 
    private final BufferHandler handler;
-
+   
    private final ConnectionLifeCycleListener listener;
 
    private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@@ -62,12 +62,12 @@
    private final ProtocolType protocol;
 
    public InVMAcceptor(final Map<String, Object> configuration,
-                       final BufferHandler handler,
+                       final BufferHandler handler,                       
                        final ConnectionLifeCycleListener listener,
                        final Executor threadPool)
    {
       this.handler = handler;
-
+      
       this.listener = listener;
 
       id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -19,6 +19,7 @@
 
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 
@@ -32,6 +33,7 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -22,8 +22,6 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import com.sun.corba.se.spi.activation.ServerHolder;
-
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
@@ -32,9 +30,11 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -72,8 +72,6 @@
 
    private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
 
-   //private final BufferHandler bufferHandler = new DelegatingBufferHandler();
-
    private final Configuration config;
 
    private final HornetQServer server;
@@ -125,6 +123,7 @@
       this.scheduledThreadPool = scheduledThreadPool;
       
       this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+      this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
       this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
    }
 
@@ -163,14 +162,15 @@
                }
             }
             
-            //TODO - allow protocol type to be configured from Configuration for each acceptor
+            String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
 
-            ProtocolType protocol = hackProtocol;
+            ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
             
             ProtocolManager manager = protocolMap.get(protocol);
             
             Acceptor acceptor = factory.createAcceptor(info.getParams(),
                                                        new DelegatingBufferHandler(manager),
+                                                       manager,
                                                        this,
                                                        threadPool,
                                                        scheduledThreadPool);
@@ -202,9 +202,6 @@
 
       started = true;
    }
-   
-   //FIXME - temp hack so we can choose AARDVARK as protocol
-   public static ProtocolType hackProtocol = ProtocolType.CORE;
 
    public synchronized void freeze()
    {
@@ -406,11 +403,6 @@
             conn.connection.bufferReceived(connectionID, buffer);
          }
       }
-
-      public int isReadyToHandle(HornetQBuffer buffer)
-      {
-         return manager.isReadyToHandle(buffer);
-      }
    }
 
    private final class FailureCheckAndFlushThread extends Thread

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -18,7 +18,7 @@
 
 import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
 import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
 
@@ -46,24 +46,22 @@
 
    // Public --------------------------------------------------------
 
-   public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferHandler handler)
+   public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferDecoder decoder)
    {
       assert pipeline != null;
       
       if (protocol == ProtocolType.CORE)
       {
+         //Core protocol uses it's own optimised decoder
          pipeline.addLast("decoder", new HornetQFrameDecoder2());
       }
       else if (protocol == ProtocolType.STOMP)
       {
-         pipeline.addLast("delimiter", new StompFrameDelimiter());
+         pipeline.addLast("decoder", new StompFrameDelimiter());
       }
       else
       {
-         
-         // FIXME
-         //Use the old frame decoder for other protocols
-         //pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+         pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
       }
    }
 

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.spi.core.remoting.BufferDecoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * A Netty FrameDecoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision: 8837 $, $Date: 2010-01-22 12:06:32 +0100 (Ven, 22 jan 2010) $
+ */
+public class HornetQFrameDecoder extends FrameDecoder
+{
+   private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
+
+   private final BufferDecoder decoder;
+
+   public HornetQFrameDecoder(final BufferDecoder decoder)
+   {
+      this.decoder = decoder;
+   }
+
+   // FrameDecoder overrides
+   // -------------------------------------------------------------------------------------
+
+   @Override
+   protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final ChannelBuffer in) throws Exception
+   {    
+      int start = in.readerIndex();
+
+      int length = decoder.isReadyToHandle(new ChannelBufferWrapper(in));
+      
+      in.readerIndex(start);
+      
+      if (length == -1)
+      {
+         return null;
+      }
+      
+      ChannelBuffer buffer = in.readBytes(length);
+
+      ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
+
+      newBuffer.writeBytes(buffer);
+      
+      return newBuffer;
+   }
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -24,7 +24,7 @@
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
- * A Netty decoder used to decode messages.
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
  *
  * @author <a href="tlee at redhat.com">Trustin Lee</a>
  *

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -37,6 +37,7 @@
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -86,6 +87,8 @@
    private ServerBootstrap bootstrap;
 
    private final BufferHandler handler;
+   
+   private final BufferDecoder decoder;
 
    private final ConnectionLifeCycleListener listener;
 
@@ -135,11 +138,14 @@
 
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
+                        final BufferDecoder decoder,
                         final ConnectionLifeCycleListener listener,
                         final Executor threadPool,
                         final ScheduledExecutorService scheduledThreadPool)
    {
       this.handler = handler;
+      
+      this.decoder = decoder;
 
       this.listener = listener;
       
@@ -287,21 +293,9 @@
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
                pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
             }
-            /*
-            if (protocol == ProtocolType.STOMP)
-            {
-               pipeline.addLast("delimiter", new StompFrameDelimiter());
-               pipeline.addLast("codec", new StompFrameDecoder());
-               pipeline.addLast("handler", new StompChannelHandler(serverHolder,
-                                                                   channelGroup,
-                                                                   NettyAcceptor.this,
-                                                                   new Listener()));
-            }
-            else
-            {*/
-               ChannelPipelineSupport.addCodecFilter(protocol, pipeline, handler);
-               pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-//            }
+
+            ChannelPipelineSupport.addCodecFilter(protocol, pipeline, decoder);
+            pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }
       };

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -20,6 +20,7 @@
 
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 
@@ -32,11 +33,12 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool);
+      return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -315,7 +315,7 @@
                pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
                pipeline.addLast("httphandler", new HttpHandler());
             }
-            ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, handler);
+            ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, null);
             pipeline.addLast("handler", new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -14,6 +14,7 @@
 package org.hornetq.spi.core.protocol;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.Connection;
 
 /**
@@ -23,7 +24,7 @@
  *
  *
  */
-public interface ProtocolManager
+public interface ProtocolManager extends BufferDecoder
 {
    ConnectionEntry createConnectionEntry(Connection connection);
    

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -34,6 +34,7 @@
     *
     * @param configuration       the configuration
     * @param handler             the handler
+    * @param decoder             the decoder
     * @param listener            the listener
     * @param threadPool          the threadpool
     * @param scheduledThreadPool a scheduled thread pool
@@ -41,6 +42,7 @@
     */
    Acceptor createAcceptor(final Map<String, Object> configuration,
                            BufferHandler handler,
+                           BufferDecoder decoder,
                            ConnectionLifeCycleListener listener,
                            Executor threadPool,
                            ScheduledExecutorService scheduledThreadPool);

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java (from rev 8839, trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+   /**
+    * called by the remoting system prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
+    * <p/>
+    * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+    *
+    * @param buffer the buffer
+    * @return true id the buffer can be decoded..
+    */
+   int isReadyToHandle(HornetQBuffer buffer);
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -28,5 +28,5 @@
     *
     * @param buffer       the buffer to decode
     */
-   void bufferReceived(Object connectionID, HornetQBuffer buffer);
+   void bufferReceived(Object connectionID, HornetQBuffer buffer);   
 }

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -56,10 +56,10 @@
       suite.addTest(new LargeMessageFailoverTest("testCreateNewFactoryAfterFailover"));
 
       // Those tests are temporarily disabled for LargeMessage
-      // suite.addTest(new LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
-      // suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
-      // suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
-      // suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
+      suite.addTest(new LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
+      suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
+      suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
+      suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
 
       suite.addTest(new LargeMessageFailoverTest("testForceBlockingReturn"));
       suite.addTest(new LargeMessageFailoverTest("testCommitOccurredUnblockedAndResendNoDuplicates"));

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -98,7 +98,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -524,11 +524,6 @@
          this.latch = latch;
       }
 
-      public int isReadyToHandle(final HornetQBuffer buffer)
-      {
-         return 0;
-      }
-
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
          int i = buffer.readInt();

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -45,8 +45,6 @@
 
    public void testAardvark() throws Exception
    {
-      RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-      
       Configuration config = new ConfigurationImpl();
       
       config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
       
       params.put(TransportConstants.PORT_PROP_NAME, 9876);
       params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.AARDVARK.toString());
       
       TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
                                                              params);
@@ -95,7 +94,5 @@
       socket.close();
       
       server.stop();
-      
-      RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
    }
 }

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -69,6 +69,7 @@
       };
       Acceptor acceptor = factory.createAcceptor(params,
                                                  handler,
+                                                 null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -84,6 +84,7 @@
 
       NettyAcceptor acceptor = new NettyAcceptor(params,
                                                  handler,
+                                                 null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-22 13:21:27 UTC (rev 8841)
@@ -72,7 +72,7 @@
       };
 
       NettyConnector connector = new NettyConnector(params,
-                                                    handler,
+                                                    handler,                                                    
                                                     listener,
                                                     Executors.newCachedThreadPool(),
                                                     Executors.newCachedThreadPool(),



More information about the hornetq-commits mailing list