[hornetq-commits] JBoss hornetq SVN: r11697 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 18 05:43:19 EST 2011


Author: borges
Date: 2011-11-18 05:43:18 -0500 (Fri, 18 Nov 2011)
New Revision: 11697

Added:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
Modified:
   trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-768 Stop backup if replication registration at live failed.

Modified: trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
===================================================================
--- trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -136,11 +136,11 @@
 
 
    /**
-    * A Session Metadata was set in duplication 
+    * A Session Metadata was set in duplication
     */
    public static final int DUPLICATE_METADATA = 114;
 
-   
+
    // Native Error codes ----------------------------------------------
 
    /**
@@ -193,6 +193,8 @@
     */
    public static final int NATIVE_ERROR_AIO_FULL = 211;
 
+   public static final int ALREADY_REPLICATING = 212;
+
    private int code;
 
    public HornetQException()

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -17,8 +17,10 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
@@ -32,6 +34,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -208,11 +211,34 @@
             } else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
             {
                BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
-               if (server.startReplication(rc, acceptorUsed.getClusterConnection(), getPair(msg.getConnector(), true)))
-               {
-                  // XXX if it fails, the backup should get to know it
+
+               try {
+                  server.startReplication(rc, acceptorUsed.getClusterConnection(), getPair(msg.getConnector(), true));
+               } catch (HornetQException e){
+                 channel0.send(new BackupRegistrationFailedMessage(e));
                }
             }
+            else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION_FAILED)
+            {
+               assert server.getConfiguration().isBackup();
+               assert !server.getConfiguration().isSharedStore();
+               log.warn("Replication failed to start because of exception with error " +
+                        ((BackupRegistrationFailedMessage)packet).getCause());
+               Executors.newSingleThreadExecutor().execute(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        server.stop();
+                     }
+                     catch (Exception e)
+                     {
+                        log.error("Error while stopping server: " + server, e);
+                     }
+                  }
+               });
+            }
          }
 
           private Pair<TransportConfiguration, TransportConfiguration> getPair(TransportConfiguration conn,

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -147,7 +147,7 @@
    public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
 
    public static final byte SESS_PRODUCER_CREDITS = 80;
-   
+
    public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
 
    // Replication
@@ -179,15 +179,15 @@
    public static final byte REPLICATION_COMPARE_DATA = 102;
 
    public static final byte REPLICATION_SYNC_FILE = 103;
-   
+
    public static final byte SESS_ADD_METADATA = 104;
-   
+
    public static final byte SESS_ADD_METADATA2 = 105;
-   
+
    public static final byte SESS_UNIQUE_ADD_METADATA = 106;
-   
-   
 
+
+
    // HA
 
    public static final byte CLUSTER_TOPOLOGY = 110;
@@ -195,7 +195,7 @@
    public static final byte NODE_ANNOUNCE = 111;
 
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
-   
+
    // For newer versions
 
    public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
@@ -203,9 +203,10 @@
    public static final byte CLUSTER_TOPOLOGY_V2 = 114;
 
    public static final byte BACKUP_REGISTRATION = 115;
+   public static final byte BACKUP_REGISTRATION_FAILED = 116;
 
    public static final byte REPLICATION_START_FINISH_SYNC = 120;
- 
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -288,7 +289,7 @@
    {
       return true;
    }
-   
+
    public boolean isAsyncExec()
    {
       return false;

Added: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java	                        (rev 0)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -0,0 +1,40 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Informs the Backup trying to start replicating of an error.
+ */
+public final class BackupRegistrationFailedMessage extends PacketImpl
+{
+
+   int errorCode;
+
+   public BackupRegistrationFailedMessage(HornetQException e)
+   {
+      super(BACKUP_REGISTRATION_FAILED);
+      errorCode = e.getCode();
+   }
+
+   public int getCause()
+   {
+      return errorCode;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(errorCode);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      errorCode = buffer.readInt();
+   }
+}

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -13,7 +13,7 @@
  * After registration the live server will initiate synchronization of its state with the new backup
  * node.
  */
-public class BackupRegistrationMessage extends PacketImpl
+public final class BackupRegistrationMessage extends PacketImpl
 {
 
    private TransportConfiguration connector;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -17,6 +17,7 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
@@ -185,8 +186,8 @@
     * @param rc
     * @param pair
     * @param clusterConnection
-    * @return {@code true} if replication started successfully, {@code false} otherwise
+    * @throws HornetQException
     */
-   boolean startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
-      Pair<TransportConfiguration, TransportConfiguration> pair);
+   void startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+                         Pair<TransportConfiguration, TransportConfiguration> pair) throws HornetQException;
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-15 18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-18 10:43:18 UTC (rev 11697)
@@ -2256,12 +2256,12 @@
 
 
    @Override
-   public boolean startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
-      Pair<TransportConfiguration, TransportConfiguration> pair)
+   public void startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+                             Pair<TransportConfiguration, TransportConfiguration> pair) throws HornetQException
    {
       if (replicationManager != null)
       {
-         return false;
+         throw new HornetQException(HornetQException.ALREADY_REPLICATING);
       }
 
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
@@ -2270,7 +2270,6 @@
          replicationManager.start();
          storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(), clusterConnection,
                                          pair);
-         return true;
       }
       catch (Exception e)
       {
@@ -2279,10 +2278,16 @@
           * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the exception
           * and ignore the replication request. If (2) the live will crash shortly.
           */
-         // HORNETQ-720 Need to verify whether swallowing the exception here is acceptable
          log.warn("Exception when trying to start replication", e);
          replicationManager = null;
-         return false;
+         if (e instanceof HornetQException)
+         {
+            throw (HornetQException)e;
+         }
+         else
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
+         }
       }
    }
 



More information about the hornetq-commits mailing list