Author: jmesnil
Date: 2010-01-22 08:21:27 -0500 (Fri, 22 Jan 2010)
New Revision: 8841
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* sync with the trunk: svn merge -r 8832:8839
https://svn.jboss.org/repos/hornetq/trunk
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -130,10 +130,4 @@
{
manager.handleBuffer(this, buffer);
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return -1;
- }
-
}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.core.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
- private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- if (buffer.readableBytes() < DataConstants.SIZE_INT)
- {
- return -1;
- }
-
- int length = buffer.readInt();
-
- if (buffer.readableBytes() < length)
- {
- return -1;
- }
-
- return length;
- }
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -125,6 +125,8 @@
{
}
+ //This is never called using the core protocol, since we override the
HornetQFrameDecoder with our core
+ //optimised version HornetQFrameDecoder2, which nevers calls this
public int isReadyToHandle(HornetQBuffer buffer)
{
return -1;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -31,6 +31,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -39,7 +40,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements
CoreRemotingConnection
+public class RemotingConnectionImpl implements BufferHandler, CoreRemotingConnection
{
// Constants
//
------------------------------------------------------------------------------------
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -292,5 +292,4 @@
{
return -1;
}
-
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -46,7 +46,7 @@
private final int id;
private final BufferHandler handler;
-
+
private final ConnectionLifeCycleListener listener;
private final ConcurrentMap<String, Connection> connections = new
ConcurrentHashMap<String, Connection>();
@@ -62,12 +62,12 @@
private final ProtocolType protocol;
public InVMAcceptor(final Map<String, Object> configuration,
- final BufferHandler handler,
+ final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
this.handler = handler;
-
+
this.listener = listener;
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0,
configuration);
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -19,6 +19,7 @@
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -32,6 +33,7 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -22,8 +22,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import com.sun.corba.se.spi.activation.ServerHolder;
-
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -32,9 +30,11 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -72,8 +72,6 @@
private final Map<Object, ConnectionEntry> connections = new
ConcurrentHashMap<Object, ConnectionEntry>();
- //private final BufferHandler bufferHandler = new DelegatingBufferHandler();
-
private final Configuration config;
private final HornetQServer server;
@@ -125,6 +123,7 @@
this.scheduledThreadPool = scheduledThreadPool;
this.protocolMap.put(ProtocolType.CORE, new
CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP, new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
this.protocolMap.put(ProtocolType.AARDVARK, new
AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
}
@@ -163,14 +162,15 @@
}
}
- //TODO - allow protocol type to be configured from Configuration for each
acceptor
+ String protocolString =
ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.DEFAULT_PROTOCOL, info.getParams());
- ProtocolType protocol = hackProtocol;
+ ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
ProtocolManager manager = protocolMap.get(protocol);
Acceptor acceptor = factory.createAcceptor(info.getParams(),
new
DelegatingBufferHandler(manager),
+ manager,
this,
threadPool,
scheduledThreadPool);
@@ -202,9 +202,6 @@
started = true;
}
-
- //FIXME - temp hack so we can choose AARDVARK as protocol
- public static ProtocolType hackProtocol = ProtocolType.CORE;
public synchronized void freeze()
{
@@ -406,11 +403,6 @@
conn.connection.bufferReceived(connectionID, buffer);
}
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return manager.isReadyToHandle(buffer);
- }
}
private final class FailureCheckAndFlushThread extends Thread
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -18,7 +18,7 @@
import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -46,24 +46,22 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline
pipeline, final BufferHandler handler)
+ public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline
pipeline, final BufferDecoder decoder)
{
assert pipeline != null;
if (protocol == ProtocolType.CORE)
{
+ //Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP)
{
- pipeline.addLast("delimiter", new StompFrameDelimiter());
+ pipeline.addLast("decoder", new StompFrameDelimiter());
}
else
{
-
- // FIXME
- //Use the old frame decoder for other protocols
- //pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
}
}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.spi.core.remoting.BufferDecoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * A Netty FrameDecoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
+ * @author <a href="tlee(a)redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision: 8837 $, $Date: 2010-01-22 12:06:32 +0100 (Ven, 22 jan 2010) $
+ */
+public class HornetQFrameDecoder extends FrameDecoder
+{
+ private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
+
+ private final BufferDecoder decoder;
+
+ public HornetQFrameDecoder(final BufferDecoder decoder)
+ {
+ this.decoder = decoder;
+ }
+
+ // FrameDecoder overrides
+ //
-------------------------------------------------------------------------------------
+
+ @Override
+ protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final
ChannelBuffer in) throws Exception
+ {
+ int start = in.readerIndex();
+
+ int length = decoder.isReadyToHandle(new ChannelBufferWrapper(in));
+
+ in.readerIndex(start);
+
+ if (length == -1)
+ {
+ return null;
+ }
+
+ ChannelBuffer buffer = in.readBytes(length);
+
+ ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
+
+ newBuffer.writeBytes(buffer);
+
+ return newBuffer;
+ }
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -24,7 +24,7 @@
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
- * A Netty decoder used to decode messages.
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
*
* @author <a href="tlee(a)redhat.com">Trustin Lee</a>
*
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -86,6 +87,8 @@
private ServerBootstrap bootstrap;
private final BufferHandler handler;
+
+ private final BufferDecoder decoder;
private final ConnectionLifeCycleListener listener;
@@ -135,11 +138,14 @@
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
+
+ this.decoder = decoder;
this.listener = listener;
@@ -287,21 +293,9 @@
pipeline.addLast("httpResponseEncoder", new
HttpResponseEncoder());
pipeline.addLast("httphandler", new
HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
- /*
- if (protocol == ProtocolType.STOMP)
- {
- pipeline.addLast("delimiter", new StompFrameDelimiter());
- pipeline.addLast("codec", new StompFrameDecoder());
- pipeline.addLast("handler", new
StompChannelHandler(serverHolder,
- channelGroup,
- NettyAcceptor.this,
- new Listener()));
- }
- else
- {*/
- ChannelPipelineSupport.addCodecFilter(protocol, pipeline, handler);
- pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-// }
+
+ ChannelPipelineSupport.addCodecFilter(protocol, pipeline, decoder);
+ pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
};
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -20,6 +20,7 @@
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -32,11 +33,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, listener, threadPool,
scheduledThreadPool);
+ return new NettyAcceptor(configuration, handler, decoder, listener, threadPool,
scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -315,7 +315,7 @@
pipeline.addLast("httpResponseDecoder", new
HttpResponseDecoder());
pipeline.addLast("httphandler", new HttpHandler());
}
- ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, handler);
+ ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, null);
pipeline.addLast("handler", new
HornetQClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.protocol;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -23,7 +24,7 @@
*
*
*/
-public interface ProtocolManager
+public interface ProtocolManager extends BufferDecoder
{
ConnectionEntry createConnectionEntry(Connection connection);
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -34,6 +34,7 @@
*
* @param configuration the configuration
* @param handler the handler
+ * @param decoder the decoder
* @param listener the listener
* @param threadPool the threadpool
* @param scheduledThreadPool a scheduled thread pool
@@ -41,6 +42,7 @@
*/
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
+ BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
(from rev 8839, trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+ /**
+ * called by the remoting system prior to {@link
org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object,
org.hornetq.api.core.HornetQBuffer)}.
+ * <p/>
+ * The implementation should return true if there is enough data in the buffer to
decode. otherwise false.
+ *
+ * @param buffer the buffer
+ * @return true id the buffer can be decoded..
+ */
+ int isReadyToHandle(HornetQBuffer buffer);
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -28,5 +28,5 @@
*
* @param buffer the buffer to decode
*/
- void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ void bufferReceived(Object connectionID, HornetQBuffer buffer);
}
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -56,10 +56,10 @@
suite.addTest(new
LargeMessageFailoverTest("testCreateNewFactoryAfterFailover"));
// Those tests are temporarily disabled for LargeMessage
- // suite.addTest(new
LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
- // suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
- // suite.addTest(new
LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
- // suite.addTest(new
LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
+ suite.addTest(new
LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
+ suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
+ suite.addTest(new
LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
+ suite.addTest(new
LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
suite.addTest(new LargeMessageFailoverTest("testForceBlockingReturn"));
suite.addTest(new
LargeMessageFailoverTest("testCommitOccurredUnblockedAndResendNoDuplicates"));
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -98,7 +98,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -524,11 +524,6 @@
this.latch = latch;
}
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- return 0;
- }
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
int i = buffer.readInt();
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -45,8 +45,6 @@
public void testAardvark() throws Exception
{
- RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-
Configuration config = new ConfigurationImpl();
config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
params.put(TransportConstants.PORT_PROP_NAME, 9876);
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+ params.put(TransportConstants.PROTOCOL_PROP_NAME,
ProtocolType.AARDVARK.toString());
TransportConfiguration tc = new
TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
params);
@@ -95,7 +94,5 @@
socket.close();
server.stop();
-
- RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -69,6 +69,7 @@
};
Acceptor acceptor = factory.createAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -84,6 +84,7 @@
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22
12:46:06 UTC (rev 8840)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22
13:21:27 UTC (rev 8841)
@@ -72,7 +72,7 @@
};
NettyConnector connector = new NettyConnector(params,
- handler,
+ handler,
listener,
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),