Author: timfox
Date: 2010-10-08 07:49:58 -0400 (Fri, 08 Oct 2010)
New Revision: 9760
Added:
trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-533
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -296,9 +296,15 @@
{
handleConnectionFailure(connectionID, me);
}
+
+ public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+ {
+ }
// ConnectionManager implementation
------------------------------------------------------------------
+
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
/**
* A CoreSessionCallback
@@ -40,7 +41,7 @@
private ProtocolManager protocolManager;
private String name;
-
+
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel
channel)
{
this.name = name;
@@ -90,4 +91,15 @@
{
protocolManager.removeHandler(name);
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ channel.getConnection().getTransportConnection().addReadyListener(listener);
+ }
+
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ channel.getConnection().getTransportConnection().removeReadyListener(listener);
+ }
+
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-06 13:49:22
UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-08 11:49:58
UTC (rev 9760)
@@ -139,6 +139,10 @@
*/
public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
{
+ //log.info("got buff " + buffer.readableBytes());
+
+ long start = System.nanoTime();
+
int readable = buffer.readableBytes();
if (data + readable >= workingBuffer.length)
@@ -301,6 +305,8 @@
throwInvalid();
}
}
+
+ long commandTime = System.nanoTime() - start;
if (readingHeaders)
{
@@ -406,6 +412,8 @@
}
}
}
+
+ long headersTime = System.nanoTime() - start - commandTime;
// Now the body
@@ -447,6 +455,8 @@
}
}
}
+
+
if (content != null)
{
@@ -464,6 +474,12 @@
StompFrame ret = new StompFrame(command, headers, content);
init();
+
+ // log.info("decoded");
+
+ long bodyTime = System.nanoTime() - start - headersTime - commandTime;
+
+ // log.info("command: "+ commandTime + " headers: " +
headersTime + " body: " + bodyTime);
return ret;
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -142,11 +142,14 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
{
+ long start = System.nanoTime();
StompConnection conn = (StompConnection)connection;
conn.setDataReceived();
StompDecoder decoder = conn.getDecoder();
+
+ // log.info("in handle");
do
{
@@ -165,7 +168,7 @@
if (request == null)
{
- return;
+ break;
}
try
@@ -253,6 +256,10 @@
server.getStorageManager().clearContext();
}
} while (decoder.hasBytes());
+
+ long end = System.nanoTime();
+
+ // log.info("handle took " + (end-start));
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-06 13:49:22
UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-08 11:49:58
UTC (rev 9760)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
@@ -156,7 +157,17 @@
public void closed()
{
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ connection.getTransportConnection().addReadyListener(listener);
+ }
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ connection.getTransportConnection().removeReadyListener(listener);
+ }
+
public void acknowledge(String messageID) throws Exception
{
long id = Long.parseLong(messageID);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -250,6 +250,9 @@
listener.connectionException(connectionID, me);
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -22,6 +22,7 @@
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -32,6 +33,7 @@
*/
public class InVMConnection implements Connection
{
+
private static final Logger log = Logger.getLogger(InVMConnection.class);
private final BufferHandler handler;
@@ -159,5 +161,12 @@
{
return -1;
}
+
+ public void addReadyListener(ReadyListener listener)
+ {
+ }
+ public void removeReadyListener(ReadyListener listener)
+ {
+ }
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -215,6 +215,12 @@
});
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
+
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -58,8 +58,14 @@
group.add(e.getChannel());
ctx.sendUpstream(e);
}
-
+
@Override
+ public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception
+ {
+ listener.connectionReadyForWrites(e.getChannel().getId(),
e.getChannel().isWritable());
+ }
+
+ @Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
throws Exception
{
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -137,7 +137,7 @@
private final HttpKeepAliveRunnable httpKeepAliveRunnable;
- private final ConcurrentMap<Object, Connection> connections = new
ConcurrentHashMap<Object, Connection>();
+ private final ConcurrentMap<Object, NettyConnection> connections = new
ConcurrentHashMap<Object, NettyConnection>();
private final Executor threadPool;
@@ -654,7 +654,7 @@
{
public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
- if (connections.putIfAbsent(connection.getID(), connection) != null)
+ if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) !=
null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
@@ -683,6 +683,16 @@
}.start();
}
+
+ public void connectionReadyForWrites(final Object connectionID, boolean ready)
+ {
+ NettyConnection conn = connections.get(connectionID);
+
+ if (conn != null)
+ {
+ conn.fireReady(ready);
+ }
+ }
}
private class BatchFlusher implements Runnable
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -13,6 +13,7 @@
package org.hornetq.core.remoting.impl.netty;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.api.core.HornetQBuffer;
@@ -22,6 +23,8 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConcurrentHashSet;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@@ -57,6 +60,8 @@
private volatile HornetQBuffer batchBuffer;
private final AtomicBoolean writeLock = new AtomicBoolean(false);
+
+ private Set<ReadyListener> readyListeners = new
ConcurrentHashSet<ReadyListener>();
// Static --------------------------------------------------------
@@ -241,6 +246,24 @@
{
return directDeliver;
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ readyListeners.add(listener);
+ }
+
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ readyListeners.remove(listener);
+ }
+
+ public void fireReady(final boolean ready)
+ {
+ for (ReadyListener listener: readyListeners)
+ {
+ listener.readyForWriting(ready);
+ }
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -693,6 +693,12 @@
}
});
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
}
private class BatchFlusher implements Runnable
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -423,6 +423,10 @@
// Connections should only fail when TTL is exceeded
}
+
+ public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+ {
+ }
public void addInterceptor(final Interceptor interceptor)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -17,6 +17,7 @@
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
@@ -42,6 +43,7 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
@@ -54,7 +56,7 @@
*
* @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783
2008-02-25 12:15:14Z timfox $
*/
-public class ServerConsumerImpl implements ServerConsumer
+public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
// Constants
------------------------------------------------------------------------------------
@@ -117,6 +119,12 @@
private final Binding binding;
private boolean transferring = false;
+
+ /* As well as consumer credit based flow control, we also tap into TCP flow control
(assuming transport is using TCP)
+ * This is useful in the case where consumer-window-size = -1, but we don't want
to OOM by sending messages ad infinitum to the Netty
+ * write queue when the TCP buffer is full, e.g. the client is slow or has died.
+ */
+ private AtomicBoolean writeReady = new AtomicBoolean(true);
// Constructors
---------------------------------------------------------------------------------
@@ -159,6 +167,8 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
+
+ this.callback.addReadyListener(this);
if (browseOnly)
{
@@ -177,7 +187,7 @@
{
return id;
}
-
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
@@ -185,6 +195,12 @@
return HandleStatus.BUSY;
}
+// TODO -
https://jira.jboss.org/browse/HORNETQ-533
+// if (!writeReady.get())
+// {
+// return HandleStatus.BUSY;
+// }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -264,6 +280,8 @@
public void close(final boolean failed) throws Exception
{
+ callback.removeReadyListener(this);
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -584,9 +602,21 @@
return ref;
}
+
+ public void readyForWriting(final boolean ready)
+ {
+ if (ready)
+ {
+ writeReady.set(true);
+
+ promptDelivery();
+ }
+ else
+ {
+ writeReady.set(false);
+ }
+ }
- // Public
---------------------------------------------------------------------------------------
-
/** To be used on tests only */
public AtomicInteger getAvailableCredits()
{
Modified: trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java 2010-10-06 13:49:22
UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java 2010-10-08 11:49:58
UTC (rev 9760)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.remoting.ReadyListener;
/**
* A SessionCallback
@@ -34,4 +35,8 @@
int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues,
boolean requiresResponse);
void closed();
+
+ void addReadyListener(ReadyListener listener);
+
+ void removeReadyListener(ReadyListener listener);
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-10-06 13:49:22 UTC
(rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-10-08 11:49:58 UTC
(rev 9760)
@@ -70,4 +70,8 @@
* Called periodically to flush any data in the batch buffer
*/
void checkFlushBatchBuffer();
+
+ void addReadyListener(ReadyListener listener);
+
+ void removeReadyListener(ReadyListener listener);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
---
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -44,4 +44,6 @@
* @param me the exception.
*/
void connectionException(Object connectionID, HornetQException me);
+
+ void connectionReadyForWrites(Object connectionID, boolean ready);
}
Added: trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
(rev 0)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java 2010-10-08 11:49:58
UTC (rev 9760)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+/**
+ * A ReadyListener
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ReadyListener
+{
+ void readyForWriting(boolean ready);
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-10-06
13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -561,5 +561,11 @@
{
me.printStackTrace();
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -66,6 +66,12 @@
public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
};
Acceptor acceptor = factory.createAcceptor(params,
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -80,6 +80,10 @@
public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -234,6 +234,10 @@
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-10-06
13:49:22 UTC (rev 9759)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-10-08
11:49:58 UTC (rev 9760)
@@ -69,6 +69,9 @@
public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
NettyConnector connector = new NettyConnector(params,
@@ -106,6 +109,10 @@
public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
try