[hornetq-commits] JBoss hornetq SVN: r8815 - in trunk/src/main/org/hornetq/core: remoting and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 12:00:28 EST 2010


Author: timfox
Date: 2010-01-20 12:00:28 -0500 (Wed, 20 Jan 2010)
New Revision: 8815

Removed:
   trunk/src/main/org/hornetq/core/exception/
Modified:
   trunk/src/main/org/hornetq/core/remoting/Channel.java
   trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
reverted last commit

Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java	2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java	2010-01-20 17:00:28 UTC (rev 8815)
@@ -17,10 +17,10 @@
 import org.hornetq.api.core.HornetQException;
 
 /**
- * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.CoreRemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.RemotingConnection}.
  * <p/>
  * Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
  * <p/>
  * Each Channel should will forward any packets received to its {@link org.hornetq.core.remoting.ChannelHandler}.
  * <p/>
@@ -81,7 +81,7 @@
     *
     * @param newConnection the new connection
     */
-   void transferConnection(CoreRemotingConnection newConnection);
+   void transferConnection(RemotingConnection newConnection);
 
    /**
     * resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
    /**
     * returns the Remoting Connection being used by the channel
     */
-   CoreRemotingConnection getConnection();
+   RemotingConnection getConnection();
 
    /**
     * sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
    void flushConfirmations();
 
    /**
-    * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is received.
+    * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is received.
     * <p/>
     * This method should then call its {@link org.hornetq.core.remoting.ChannelHandler} after appropriate processing of
     * the packet

Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2010-01-20 17:00:28 UTC (rev 8815)
@@ -46,6 +46,40 @@
    String getRemoteAddress();
 
    /**
+    * return the channel with the channel id specified.
+    * <p/>
+    * If it does not exist create it with the confirmation window size.
+    *
+    * @param channelID      the channel id
+    * @param confWindowSize the confirmation window size
+    * @return the channel
+    */
+   Channel getChannel(long channelID, int confWindowSize);
+
+   /**
+    * add the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @param channel   the channel
+    */
+   void putChannel(long channelID, Channel channel);
+
+   /**
+    * remove the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @return true if removed
+    */
+   boolean removeChannel(long channelID);
+
+   /**
+    * generate a unique (within this connection) channel id
+    *
+    * @return the id
+    */
+   long generateChannelID();
+
+   /**
     * add a failure listener.
     * <p/>
     * The listener will be called in the event of connection failure.
@@ -116,6 +150,20 @@
    void destroy();
 
    /**
+    * resets the id generator used to when generating id's
+    *
+    * @param id the first id to set it to
+    */
+   void syncIDGeneratorSequence(long id);
+
+   /**
+    * return the next id that will be chosen.
+    *
+    * @return the id
+    */
+   long getIDGeneratorSequence();
+
+   /**
     * return the underlying Connection.
     *
     * @return the connection
@@ -134,7 +182,36 @@
     *
     * @return true if destroyed, otherwise false
     */
-   boolean isDestroyed();    
-   
-   void disconnect();
+   boolean isDestroyed();
+
+   /**
+    * return the current tomeout for blocking calls
+    *
+    * @return the timeout in milliseconds
+    */
+   long getBlockingCallTimeout();
+
+   /**
+    * return the transfer lock used when transferring connections.
+    *
+    * @return the lock
+    */
+   Object getTransferLock();
+
+   /**
+    * returns true if any data has been received since the last time this method was called.
+    *
+    * @return true if data has been received.
+    */
+   boolean checkDataReceived();
+
+   /**
+    * remove all channels from the remoting connection
+    */
+   void removeAllChannels();
+
+   /**
+    * flush all outstanding confirmations onto the connection.
+    */
+   void flushConfirmations();
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2010-01-20 17:00:28 UTC (rev 8815)
@@ -25,7 +25,6 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.CommandConfirmationHandler;
-import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -53,7 +52,7 @@
 
    private volatile int lastConfirmedCommandID = -1;
 
-   private volatile CoreRemotingConnection connection;
+   private volatile RemotingConnection connection;
 
    private volatile boolean closed;
 
@@ -77,7 +76,7 @@
       
    private volatile boolean transferring;
  
-   public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
+   public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
    {
       this.connection = connection;
 
@@ -317,7 +316,7 @@
       closed = true;
    }
    
-   public void transferConnection(final CoreRemotingConnection newConnection)
+   public void transferConnection(final RemotingConnection newConnection)
    {
       // Needs to synchronize on the connection to make sure no packets from
       // the old connection get processed after transfer has occurred
@@ -327,7 +326,7 @@
 
          // And switch it
 
-         final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
+         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
 
          rnewConnection.putChannel(id, this);
 
@@ -370,7 +369,7 @@
       lock.unlock();
    }
 
-   public CoreRemotingConnection getConnection()
+   public RemotingConnection getConnection()
    {
       return connection;
    }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-20 17:00:28 UTC (rev 8815)
@@ -26,9 +26,9 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt> $Id$
  */
-public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------
@@ -84,8 +84,6 @@
    private volatile boolean dataReceived;
 
    private final Executor executor;
-   
-   private volatile boolean executing;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -277,22 +275,7 @@
 
       callClosingListeners();
    }
-   
-   public void disconnect()
-   {
-      Channel channel0 = getChannel(0, -1);
 
-      // And we remove all channels from the connection, this ensures no more packets will be processed after this
-      // method is
-      // complete
-
-      removeAllChannels();
-
-      // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
-      channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
-   }
-
    public long generateChannelID()
    {
       return idGenerator.generateID();
@@ -342,6 +325,16 @@
       return res;
    }
 
+   public void removeAllChannels()
+   {
+      // We get the transfer lock first - this ensures no packets are being processed AND
+      // it's guaranteed no more packets will be processed once this method is complete
+      synchronized (transferLock)
+      {
+         channels.clear();
+      }
+   }
+
    public void flushConfirmations()
    {
       synchronized (transferLock)
@@ -356,6 +349,8 @@
    // Buffer Handler implementation
    // ----------------------------------------------------
 
+   private volatile boolean executing;
+
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
@@ -439,15 +434,6 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private void removeAllChannels()
-   {
-      // We get the transfer lock first - this ensures no packets are being processed AND
-      // it's guaranteed no more packets will be processed once this method is complete
-      synchronized (transferLock)
-      {
-         channels.clear();
-      }
-   }  
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 17:00:28 UTC (rev 8815)
@@ -170,7 +170,6 @@
             if (managementService != null)
             {
                acceptor.setNotificationService(managementService);
-               
                managementService.registerAcceptor(acceptor, info);
             }
          }
@@ -236,7 +235,17 @@
       {
          RemotingConnection conn = entry.connection;
 
-         conn.disconnect();
+         Channel channel0 = conn.getChannel(0, -1);
+
+         // And we remove all channels from the connection, this ensures no more packets will be processed after this
+         // method is
+         // complete
+
+         conn.removeAllChannels();
+
+         // Now we are 100% sure that no more packets will be processed we can send the disconnect
+
+         channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
       }
 
       for (Acceptor acceptor : acceptors)



More information about the hornetq-commits mailing list