Author: timfox
Date: 2010-01-22 06:06:32 -0500 (Fri, 22 Jan 2010)
New Revision: 8837
Added:
trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
Removed:
trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
removed the buffer decoding abstraction from the handler
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -39,7 +39,6 @@
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -48,6 +47,7 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.Connector;
@@ -1085,7 +1085,7 @@
}
}
- private class DelegatingBufferHandler extends AbstractBufferHandler
+ private class DelegatingBufferHandler implements BufferHandler
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
Modified: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -130,10 +130,4 @@
{
manager.handleBuffer(this, buffer);
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return -1;
- }
-
}
Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.core.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
- private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- if (buffer.readableBytes() < DataConstants.SIZE_INT)
- {
- return -1;
- }
-
- int length = buffer.readInt();
-
- if (buffer.readableBytes() < length)
- {
- return -1;
- }
-
- return length;
- }
-}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -124,6 +124,8 @@
{
}
+ //This is never called using the core protocol, since we override the
HornetQFrameDecoder with our core
+ //optimised version HornetQFrameDecoder2, which nevers calls this
public int isReadyToHandle(HornetQBuffer buffer)
{
return -1;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -31,6 +31,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -39,7 +40,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements
CoreRemotingConnection
+public class RemotingConnectionImpl implements BufferHandler, CoreRemotingConnection
{
// Constants
//
------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -46,7 +46,7 @@
private final int id;
private final BufferHandler handler;
-
+
private final ConnectionLifeCycleListener listener;
private final ConcurrentMap<String, Connection> connections = new
ConcurrentHashMap<String, Connection>();
@@ -62,13 +62,13 @@
private final ProtocolType protocol;
public InVMAcceptor(final Map<String, Object> configuration,
- final BufferHandler handler,
+ final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ProtocolType protocol)
{
this.handler = handler;
-
+
this.listener = listener;
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0,
configuration);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -20,6 +20,7 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -33,6 +34,7 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -70,8 +70,6 @@
private final Map<Object, ConnectionEntry> connections = new
ConcurrentHashMap<Object, ConnectionEntry>();
- //private final BufferHandler bufferHandler = new DelegatingBufferHandler();
-
private final Configuration config;
private final HornetQServer server;
@@ -169,6 +167,7 @@
Acceptor acceptor = factory.createAcceptor(info.getParams(),
new
DelegatingBufferHandler(manager),
+ manager,
this,
threadPool,
scheduledThreadPool,
@@ -405,11 +404,6 @@
conn.connection.bufferReceived(connectionID, buffer);
}
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return manager.isReadyToHandle(buffer);
- }
}
private final class FailureCheckAndFlushThread extends Thread
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -17,7 +17,7 @@
import javax.net.ssl.SSLEngine;
import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -45,18 +45,19 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline
pipeline, final BufferHandler handler)
+ public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline
pipeline, final BufferDecoder decoder)
{
assert pipeline != null;
if (protocol == ProtocolType.CORE)
{
+ //Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
else
{
//Use the old frame decoder for other protocols
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
}
}
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -15,8 +15,7 @@
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -36,11 +35,11 @@
{
private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
- private final BufferHandler handler;
+ private final BufferDecoder decoder;
- public HornetQFrameDecoder(final BufferHandler handler)
+ public HornetQFrameDecoder(final BufferDecoder decoder)
{
- this.handler = handler;
+ this.decoder = decoder;
}
// FrameDecoder overrides
@@ -48,34 +47,24 @@
@Override
protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final
ChannelBuffer in) throws Exception
- {
- log.info("dewcoding!!");
-
+ {
int start = in.readerIndex();
- int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ int length = decoder.isReadyToHandle(new ChannelBufferWrapper(in));
- log.info("length is " + length);
-
in.readerIndex(start);
- log.info("length is 2 " + length);
-
if (length == -1)
{
return null;
}
- log.info("creating buffer");
-
ChannelBuffer buffer = in.readBytes(length);
ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
newBuffer.writeBytes(buffer);
- log.info("got the buffer");
-
return newBuffer;
}
}
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -24,7 +24,7 @@
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
- * A Netty decoder used to decode messages.
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
*
* @author <a href="tlee(a)redhat.com">Trustin Lee</a>
*
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -36,6 +36,7 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -88,6 +89,8 @@
private ServerBootstrap bootstrap;
private final BufferHandler handler;
+
+ private final BufferDecoder decoder;
private final ConnectionLifeCycleListener listener;
@@ -137,12 +140,15 @@
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final ProtocolType protocol)
{
this.handler = handler;
+
+ this.decoder = decoder;
this.listener = listener;
@@ -288,7 +294,7 @@
pipeline.addLast("httphandler", new
HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
- ChannelPipelineSupport.addCodecFilter(protocol, pipeline, handler);
+ ChannelPipelineSupport.addCodecFilter(protocol, pipeline, decoder);
pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -21,6 +21,7 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -33,12 +34,13 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final ProtocolType protocol)
{
- return new NettyAcceptor(configuration, handler, listener, threadPool,
scheduledThreadPool,
+ return new NettyAcceptor(configuration, handler, decoder, listener, threadPool,
scheduledThreadPool,
protocol);
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -311,7 +311,7 @@
pipeline.addLast("httpResponseDecoder", new
HttpResponseDecoder());
pipeline.addLast("httphandler", new HttpHandler());
}
- ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, handler);
+ ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, null);
pipeline.addLast("handler", new
HornetQClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
Modified: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 10:29:26
UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 11:06:32
UTC (rev 8837)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.protocol;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -23,7 +24,7 @@
*
*
*/
-public interface ProtocolManager extends BufferHandler
+public interface ProtocolManager extends BufferHandler, BufferDecoder
{
ConnectionEntry createConnectionEntry(Connection connection);
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 10:29:26
UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 11:06:32
UTC (rev 8837)
@@ -36,6 +36,7 @@
*
* @param configuration the configuration
* @param handler the handler
+ * @param decoder the decoder
* @param listener the listener
* @param threadPool the threadpool
* @param scheduledThreadPool a scheduled thread pool
@@ -43,6 +44,7 @@
*/
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
+ BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool,
Added: trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
(rev 0)
+++ trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java 2010-01-22 11:06:32
UTC (rev 8837)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+ /**
+ * called by the remoting system prior to {@link
org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object,
org.hornetq.api.core.HornetQBuffer)}.
+ * <p/>
+ * The implementation should return true if there is enough data in the buffer to
decode. otherwise false.
+ *
+ * @param buffer the buffer
+ * @return true id the buffer can be decoded..
+ */
+ int isReadyToHandle(HornetQBuffer buffer);
+}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22 10:29:26
UTC (rev 8836)
+++ trunk/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22 11:06:32
UTC (rev 8837)
@@ -29,15 +29,5 @@
* @param connectionID the connection the buffer was received on
* @param buffer the buffer to decode
*/
- void bufferReceived(Object connectionID, HornetQBuffer buffer);
-
- /**
- * called by the remoting connection prior to {@link
org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object,
org.hornetq.api.core.HornetQBuffer)}.
- * <p/>
- * The implementation should return true if there is enough data in the buffer to
decode. otherwise false.
- *
- * @param buffer the buffer
- * @return true id the buffer can be decoded..
- */
- int isReadyToHandle(HornetQBuffer buffer);
+ void bufferReceived(Object connectionID, HornetQBuffer buffer);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
10:29:26 UTC (rev 8836)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -25,7 +25,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -99,7 +98,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -153,7 +152,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -211,7 +210,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -270,7 +269,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -328,7 +327,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -382,7 +381,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -429,7 +428,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -462,7 +461,7 @@
}
}
- class SimpleBufferHandler extends AbstractBufferHandler
+ class SimpleBufferHandler implements BufferHandler
{
int messagesReceieved = 0;
@@ -484,7 +483,7 @@
}
}
- class SimpleBufferHandler2 extends AbstractBufferHandler
+ class SimpleBufferHandler2 implements BufferHandler
{
int messagesReceieved = 0;
@@ -525,11 +524,6 @@
this.latch = latch;
}
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- return 0;
- }
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
int i = buffer.readInt();
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -21,7 +21,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -45,7 +44,7 @@
NettyAcceptorFactory factory = new NettyAcceptorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
@@ -71,6 +70,7 @@
Acceptor acceptor = factory.createAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -58,7 +57,7 @@
public void testStartStop() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
@@ -84,6 +83,7 @@
};
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22
10:29:26 UTC (rev 8836)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22
11:06:32 UTC (rev 8837)
@@ -21,7 +21,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.protocol.core.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -50,7 +49,7 @@
public void testStartStop() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
{
@@ -73,7 +72,7 @@
};
NettyConnector connector = new NettyConnector(params,
- handler,
+ handler,
listener,
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
@@ -87,7 +86,7 @@
public void testNullParams() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
{