[hornetq-commits] JBoss hornetq SVN: r9760 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 8 07:49:59 EDT 2010


Author: timfox
Date: 2010-10-08 07:49:58 -0400 (Fri, 08 Oct 2010)
New Revision: 9760

Added:
   trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
   trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
   trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
   trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-533

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -296,9 +296,15 @@
    {
       handleConnectionFailure(connectionID, me);
    }
+   
+   public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+   {
+   }
 
    // ConnectionManager implementation ------------------------------------------------------------------
 
+   
+
    public ClientSession createSession(final String username,
                                       final String password,
                                       final boolean xa,

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -23,6 +23,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
 
 /**
  * A CoreSessionCallback
@@ -40,7 +41,7 @@
    private ProtocolManager protocolManager;
 
    private String name;
-
+   
    public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
    {
       this.name = name;
@@ -90,4 +91,15 @@
    {
       protocolManager.removeHandler(name);
    }
+   
+   public void addReadyListener(final ReadyListener listener)
+   {
+      channel.getConnection().getTransportConnection().addReadyListener(listener);      
+   }
+
+   public void removeReadyListener(final ReadyListener listener)
+   {
+      channel.getConnection().getTransportConnection().removeReadyListener(listener);
+   }
+
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -139,6 +139,10 @@
     */
    public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
    {
+      //log.info("got buff " + buffer.readableBytes());
+      
+      long start = System.nanoTime();
+      
       int readable = buffer.readableBytes();
 
       if (data + readable >= workingBuffer.length)
@@ -301,6 +305,8 @@
             throwInvalid();
          }
       }
+      
+      long commandTime = System.nanoTime() - start;
 
       if (readingHeaders)
       {
@@ -406,6 +412,8 @@
             }
          }
       }
+      
+      long headersTime = System.nanoTime() - start - commandTime;
 
       // Now the body
 
@@ -447,6 +455,8 @@
             }
          }
       }
+      
+      
 
       if (content != null)
       {
@@ -464,6 +474,12 @@
          StompFrame ret = new StompFrame(command, headers, content);
 
          init();
+         
+        // log.info("decoded");
+         
+         long bodyTime = System.nanoTime() - start - headersTime - commandTime;
+         
+        // log.info("command: "+ commandTime + " headers: " + headersTime + " body: " + bodyTime);
 
          return ret;
       }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -142,11 +142,14 @@
 
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
+      long start = System.nanoTime();
       StompConnection conn = (StompConnection)connection;
       
       conn.setDataReceived();
       
       StompDecoder decoder = conn.getDecoder();
+      
+     // log.info("in handle");
 
       do
       {
@@ -165,7 +168,7 @@
          
          if (request == null)
          {
-            return;
+            break;
          }
 
          try
@@ -253,6 +256,10 @@
             server.getStorageManager().clearContext();
          }
       } while (decoder.hasBytes());
+      
+      long end = System.nanoTime();
+      
+     // log.info("handle took " + (end-start));
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -30,6 +30,7 @@
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -156,7 +157,17 @@
    public void closed()
    {
    }
+   
+   public void addReadyListener(final ReadyListener listener)
+   {
+      connection.getTransportConnection().addReadyListener(listener);      
+   }
 
+   public void removeReadyListener(final ReadyListener listener)
+   {
+      connection.getTransportConnection().removeReadyListener(listener);
+   }
+
    public void acknowledge(String messageID) throws Exception
    {
       long id = Long.parseLong(messageID);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -250,6 +250,9 @@
          listener.connectionException(connectionID, me);
       }
 
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
    }
 
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -22,6 +22,7 @@
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
 import org.hornetq.utils.UUIDGenerator;
 
 /**
@@ -32,6 +33,7 @@
  */
 public class InVMConnection implements Connection
 {
+
    private static final Logger log = Logger.getLogger(InVMConnection.class);
 
    private final BufferHandler handler;
@@ -159,5 +161,12 @@
    {
       return -1;
    }
+   
+   public void addReadyListener(ReadyListener listener)
+   {
+   }
 
+   public void removeReadyListener(ReadyListener listener)
+   {
+   }
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -215,6 +215,12 @@
          });
       }
 
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
+      
+      
+
    }
 
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -58,8 +58,14 @@
       group.add(e.getChannel());
       ctx.sendUpstream(e);
    }
-   
+         
    @Override
+   public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+   {
+      listener.connectionReadyForWrites(e.getChannel().getId(), e.getChannel().isWritable());      
+   }
+
+   @Override
    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
    {
       ChannelBuffer buffer = (ChannelBuffer)e.getMessage();

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -137,7 +137,7 @@
 
    private final HttpKeepAliveRunnable httpKeepAliveRunnable;
 
-   private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
+   private final ConcurrentMap<Object, NettyConnection> connections = new ConcurrentHashMap<Object, NettyConnection>();
 
    private final Executor threadPool;
 
@@ -654,7 +654,7 @@
    {
       public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
-         if (connections.putIfAbsent(connection.getID(), connection) != null)
+         if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
@@ -683,6 +683,16 @@
          }.start();
 
       }
+
+      public void connectionReadyForWrites(final Object connectionID, boolean ready)
+      {
+         NettyConnection conn = connections.get(connectionID);
+         
+         if (conn != null)
+         {
+            conn.fireReady(ready);
+         }         
+      }            
    }
 
    private class BatchFlusher implements Runnable

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.remoting.impl.netty;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -22,6 +23,8 @@
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConcurrentHashSet;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
@@ -57,6 +60,8 @@
    private volatile HornetQBuffer batchBuffer;
 
    private final AtomicBoolean writeLock = new AtomicBoolean(false);
+   
+   private Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
 
    // Static --------------------------------------------------------
 
@@ -241,6 +246,24 @@
    {
       return directDeliver;
    }
+   
+   public void addReadyListener(final ReadyListener listener)
+   {
+      readyListeners.add(listener);
+   }
+   
+   public void removeReadyListener(final ReadyListener listener)
+   {
+      readyListeners.remove(listener);
+   }
+   
+   public void fireReady(final boolean ready)
+   {
+      for (ReadyListener listener: readyListeners)
+      {
+         listener.readyForWriting(ready);
+      }
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -693,6 +693,12 @@
             }
          });
       }
+
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
+      
+      
    }
    
    private class BatchFlusher implements Runnable

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -423,6 +423,10 @@
 
       // Connections should only fail when TTL is exceeded
    }
+   
+   public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+   {
+   }
 
    public void addInterceptor(final Interceptor interceptor)
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -17,6 +17,7 @@
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -42,6 +43,7 @@
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.TypedProperties;
 
@@ -54,7 +56,7 @@
  * 
  * @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783 2008-02-25 12:15:14Z timfox $
  */
-public class ServerConsumerImpl implements ServerConsumer
+public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -117,6 +119,12 @@
    private final Binding binding;
 
    private boolean transferring = false;
+   
+   /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
+    * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
+    * write queue when the TCP buffer is full, e.g. the client is slow or has died.    
+    */
+   private AtomicBoolean writeReady = new AtomicBoolean(true);
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -159,6 +167,8 @@
       minLargeMessageSize = session.getMinLargeMessageSize();
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
+      
+      this.callback.addReadyListener(this);
 
       if (browseOnly)
       {
@@ -177,7 +187,7 @@
    {
       return id;
    }
-
+   
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
       if (availableCredits != null && availableCredits.get() <= 0)
@@ -185,6 +195,12 @@
          return HandleStatus.BUSY;
       }
       
+// TODO - https://jira.jboss.org/browse/HORNETQ-533      
+//      if (!writeReady.get())
+//      {
+//         return HandleStatus.BUSY;
+//      }
+      
       synchronized (lock)
       {
          // If the consumer is stopped then we don't accept the message, it
@@ -264,6 +280,8 @@
 
    public void close(final boolean failed) throws Exception
    {
+      callback.removeReadyListener(this);
+      
       setStarted(false);
 
       if (largeMessageDeliverer != null)
@@ -584,9 +602,21 @@
 
       return ref;
    }
+      
+   public void readyForWriting(final boolean ready)
+   {
+      if (ready)
+      {
+         writeReady.set(true);
+         
+         promptDelivery();
+      }
+      else
+      {
+         writeReady.set(false);
+      }
+   }
 
-   // Public ---------------------------------------------------------------------------------------
-
    /** To be used on tests only */
    public AtomicInteger getAvailableCredits()
    {

Modified: trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -15,6 +15,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.remoting.ReadyListener;
 
 /**
  * A SessionCallback
@@ -34,4 +35,8 @@
    int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse);
    
    void closed();
+   
+   void addReadyListener(ReadyListener listener);
+   
+   void removeReadyListener(ReadyListener listener);
 }

Modified: trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/Connection.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/remoting/Connection.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -70,4 +70,8 @@
     * Called periodically to flush any data in the batch buffer
     */
    void checkFlushBatchBuffer();
+   
+   void addReadyListener(ReadyListener listener);
+   
+   void removeReadyListener(ReadyListener listener);
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -44,4 +44,6 @@
     * @param me           the exception.
     */
    void connectionException(Object connectionID, HornetQException me);
+   
+   void connectionReadyForWrites(Object connectionID, boolean ready);
 }

Added: trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+/**
+ * A ReadyListener
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ReadyListener
+{
+   void readyForWriting(boolean ready);
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -561,5 +561,11 @@
       {
          me.printStackTrace();
       }
+
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
+      
+      
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -66,6 +66,12 @@
          public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
+
+         public void connectionReadyForWrites(Object connectionID, boolean ready)
+         {
+         }
+         
+         
       };
 
       Acceptor acceptor = factory.createAcceptor(params,

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -80,6 +80,10 @@
          public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
+         
+         public void connectionReadyForWrites(Object connectionID, boolean ready)
+         {
+         }
       };
       NettyAcceptor acceptor = new NettyAcceptor(params,
                                                  handler,

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -234,6 +234,10 @@
       {
 
       }
+      
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
 
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-10-08 11:49:58 UTC (rev 9760)
@@ -69,6 +69,9 @@
          public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
+         public void connectionReadyForWrites(Object connectionID, boolean ready)
+         {
+         }
       };
 
       NettyConnector connector = new NettyConnector(params,
@@ -106,6 +109,10 @@
          public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
+         
+         public void connectionReadyForWrites(Object connectionID, boolean ready)
+         {
+         }
       };
 
       try



More information about the hornetq-commits mailing list