[hornetq-commits] JBoss hornetq SVN: r10911 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq: core/replication/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 1 12:00:03 EDT 2011


Author: borges
Date: 2011-07-01 12:00:03 -0400 (Fri, 01 Jul 2011)
New Revision: 10911

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
HORNETQ-720 pass CoreRemotingConnection instead of channels, as it 
allows us to use the failureListener.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -158,14 +158,10 @@
             {
                HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
                System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
-               long channelID = msg.getChannelID();
-               Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
-               Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id, -1);
-               System.out.println("msg channelID: " + channelID);
                System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
                try
                {
-                  server.addHaBackup(channelX, replicationChannel);
+                  server.addHaBackup(rc);
                }
                catch (Exception e)
                {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -25,12 +25,10 @@
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
-import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -115,25 +113,6 @@
 
             break;
          }
-         case PacketImpl.HA_BACKUP_REGISTRATION:
-         {
-            HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
-            System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
-            long channelID = msg.getChannelID();
-            Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
-            Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
-            System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
-            try
-            {
-               server.addHaBackup(channelX, replicationChannel);
-            }
-            catch (Exception e)
-            {
-               // XXX This is not what we want
-               e.printStackTrace();
-               throw new RuntimeException(e);
-            }
-         }
          default:
          {
             HornetQPacketHandler.log.error("Invalid packet " + packet);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -83,7 +83,7 @@
 
    private SessionFailureListener failureListener;
 
-   private final Channel systemChannel;
+   private CoreRemotingConnection remotingConnection;
 
    // Static --------------------------------------------------------
 
@@ -95,21 +95,16 @@
       this.executorFactory = executorFactory;
 
       CoreRemotingConnection conn = sessionFactory.getConnection();
-      systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
       replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
    }
 
-   // Public --------------------------------------------------------
-
    /**
-    * @param systemChannel
-    * @param replicatingChannel
+    * @param remotingConnection
     */
-   public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+   public ReplicationManagerImpl(CoreRemotingConnection remotingConnection)
    {
-      super();
-      this.systemChannel = systemChannel;
-      this.replicatingChannel = replicatingChannel;
+      replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+      this.remotingConnection = remotingConnection;
    }
 
    public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
@@ -314,20 +309,13 @@
          throw new IllegalStateException("ReplicationManager is already started");
       }
 
-//      replicatingConnection = sessionFactory.getConnection();
-//
-//      if (replicatingConnection == null)
-//      {
-//         ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-//         throw new HornetQException(HornetQException.ILLEGAL_STATE,
-//                                    "Backup server MUST be started before live server. Initialisation will not proceed.");
-//      }
-
       replicatingChannel.setHandler(responseHandler);
 
       CreateReplicationSessionMessage replicationStartPackage =
                new CreateReplicationSessionMessage(replicatingChannel.getID());
 
+      Channel systemChannel = remotingConnection.getChannel(CHANNEL_ID.SESSION.id, -1);
+
       systemChannel.sendBlocking(replicationStartPackage);
 
       failureListener = new SessionFailureListener()
@@ -358,10 +346,9 @@
          {
          }
       };
-      // sessionFactory.addFailureListener(failureListener);
+      remotingConnection.addFailureListener(failureListener);
 
       started = true;
-
       enabled = true;
    }
 
@@ -397,15 +384,14 @@
          replicatingChannel.close();
       }
 
+      remotingConnection.removeFailureListener(failureListener);
 //      sessionFactory.causeExit();
-//      sessionFactory.removeFailureListener(failureListener);
 //      if (replicatingConnection != null)
 //      {
 //         replicatingConnection.destroy();
 //      }
 //
-//      replicatingConnection = null;
-
+      remotingConnection = null;
       started = false;
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -28,6 +28,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.ReplicationManager;
@@ -176,5 +177,5 @@
 
    void stop(boolean failoverOnServerShutdown) throws Exception;
 
-   void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
+   void addHaBackup(CoreRemotingConnection rc) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -81,6 +81,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -547,7 +548,7 @@
             }
             log.info("announce backup to live-server (id=" + liveConnectorName + ")");
             liveServerSessionFactory.getConnection()
-                                    .getChannel(CHANNEL_ID.SESSION.id, -1)
+                                    .getChannel(CHANNEL_ID.PING.id, -1)
                                     .send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
             log.info("backup registered");
 
@@ -1945,19 +1946,15 @@
    }
 
    @Override
-   public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws Exception
+   public void addHaBackup(CoreRemotingConnection rc) throws Exception
    {
       if (!(storageManager instanceof JournalStorageManager))
          return;
       JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
 
       System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
-               ": create a ReplicationManagerImpl. Using ChannelID=" + systemChannel);
-      // XXX not sure this is the right call to use
-//      final ServerLocatorInternal serverLocator =
-//               (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
-//      ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
-      replicationManager = new ReplicationManagerImpl(systemChannel, replicatingChannel);
+               ": create a ReplicationManagerImpl");
+      replicationManager = new ReplicationManagerImpl(rc);
       System.out.println("rep.start()");
       replicationManager.start();
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java	2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java	2011-07-01 16:00:03 UTC (rev 10911)
@@ -43,17 +43,15 @@
    long getCreationTime();
 
    /**
-    * returns a string representation of the remote address of this connection
-    *
+    * Returns a string representation of the remote address of this connection.
     * @return the remote address
     */
    String getRemoteAddress();
 
    /**
-    * add a failure listener.
+    * Adds a failure listener.
     * <p/>
     * The listener will be called in the event of connection failure.
-    *
     * @param listener the listener
     */
    void addFailureListener(FailureListener listener);
@@ -82,27 +80,27 @@
     * @return true if removed
     */
    boolean removeCloseListener(CloseListener listener);
-   
+
    List<CloseListener> removeCloseListeners();
-   
+
    void setCloseListeners(List<CloseListener> listeners);
-   
-   
+
+
    /**
     * return all the failure listeners
     *
     * @return the listeners
     */
    List<FailureListener> getFailureListeners();
-   
+
    List<FailureListener> removeFailureListeners();
 
 
    /**
-    * set the failure listeners.
+    * Sets the failure listeners.
     * <p/>
-    * These will be called in the event of the connection being closed. Any previosuly added listeners will be removed.
-    *
+    * These will be called in the event of the connection being closed. Any previously added
+    * listeners will be removed.
     * @param listeners the listeners to add.
     */
    void setFailureListeners(List<FailureListener> listeners);
@@ -146,20 +144,20 @@
     *
     * @return true if destroyed, otherwise false
     */
-   boolean isDestroyed();    
-   
+   boolean isDestroyed();
+
    /**
     * Disconnect the connection, closing all channels
     */
    void disconnect();
-   
+
    /**
     * returns true if any data has been received since the last time this method was called.
     *
     * @return true if data has been received.
     */
    boolean checkDataReceived();
-   
+
    /**
     * flush all outstanding data from the connection.
     */



More information about the hornetq-commits mailing list