Author: borges
Date: 2011-11-18 05:43:18 -0500 (Fri, 18 Nov 2011)
New Revision: 11697
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
Modified:
trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-768 Stop backup if replication registration at live failed.
Modified: trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
===================================================================
---
trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java 2011-11-15
18:14:18 UTC (rev 11696)
+++
trunk/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -136,11 +136,11 @@
/**
- * A Session Metadata was set in duplication
+ * A Session Metadata was set in duplication
*/
public static final int DUPLICATE_METADATA = 114;
-
+
// Native Error codes ----------------------------------------------
/**
@@ -193,6 +193,8 @@
*/
public static final int NATIVE_ERROR_AIO_FULL = 211;
+ public static final int ALREADY_REPLICATING = 212;
+
private int code;
public HornetQException()
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-15
18:14:18 UTC (rev 11696)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -17,8 +17,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
@@ -32,6 +34,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -208,11 +211,34 @@
} else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
- if (server.startReplication(rc, acceptorUsed.getClusterConnection(),
getPair(msg.getConnector(), true)))
- {
- // XXX if it fails, the backup should get to know it
+
+ try {
+ server.startReplication(rc, acceptorUsed.getClusterConnection(),
getPair(msg.getConnector(), true));
+ } catch (HornetQException e){
+ channel0.send(new BackupRegistrationFailedMessage(e));
}
}
+ else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION_FAILED)
+ {
+ assert server.getConfiguration().isBackup();
+ assert !server.getConfiguration().isSharedStore();
+ log.warn("Replication failed to start because of exception with error
" +
+ ((BackupRegistrationFailedMessage)packet).getCause());
+ Executors.newSingleThreadExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Error while stopping server: " + server,
e);
+ }
+ }
+ });
+ }
}
private Pair<TransportConfiguration, TransportConfiguration>
getPair(TransportConfiguration conn,
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-11-15
18:14:18 UTC (rev 11696)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -147,7 +147,7 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
-
+
public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
@@ -179,15 +179,15 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC_FILE = 103;
-
+
public static final byte SESS_ADD_METADATA = 104;
-
+
public static final byte SESS_ADD_METADATA2 = 105;
-
+
public static final byte SESS_UNIQUE_ADD_METADATA = 106;
-
-
+
+
// HA
public static final byte CLUSTER_TOPOLOGY = 110;
@@ -195,7 +195,7 @@
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
-
+
// For newer versions
public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
@@ -203,9 +203,10 @@
public static final byte CLUSTER_TOPOLOGY_V2 = 114;
public static final byte BACKUP_REGISTRATION = 115;
+ public static final byte BACKUP_REGISTRATION_FAILED = 116;
public static final byte REPLICATION_START_FINISH_SYNC = 120;
-
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -288,7 +289,7 @@
{
return true;
}
-
+
public boolean isAsyncExec()
{
return false;
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -0,0 +1,40 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Informs the Backup trying to start replicating of an error.
+ */
+public final class BackupRegistrationFailedMessage extends PacketImpl
+{
+
+ int errorCode;
+
+ public BackupRegistrationFailedMessage(HornetQException e)
+ {
+ super(BACKUP_REGISTRATION_FAILED);
+ errorCode = e.getCode();
+ }
+
+ public int getCause()
+ {
+ return errorCode;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(errorCode);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ errorCode = buffer.readInt();
+ }
+}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-11-15
18:14:18 UTC (rev 11696)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -13,7 +13,7 @@
* After registration the live server will initiate synchronization of its state with the
new backup
* node.
*/
-public class BackupRegistrationMessage extends PacketImpl
+public final class BackupRegistrationMessage extends PacketImpl
{
private TransportConfiguration connector;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-15
18:14:18 UTC (rev 11696)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -17,6 +17,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -185,8 +186,8 @@
* @param rc
* @param pair
* @param clusterConnection
- * @return {@code true} if replication started successfully, {@code false} otherwise
+ * @throws HornetQException
*/
- boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
- Pair<TransportConfiguration, TransportConfiguration> pair);
+ void startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration> pair)
throws HornetQException;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-15
18:14:18 UTC (rev 11696)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-18
10:43:18 UTC (rev 11697)
@@ -2256,12 +2256,12 @@
@Override
- public boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
- Pair<TransportConfiguration, TransportConfiguration> pair)
+ public void startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration>
pair) throws HornetQException
{
if (replicationManager != null)
{
- return false;
+ throw new HornetQException(HornetQException.ALREADY_REPLICATING);
}
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
@@ -2270,7 +2270,6 @@
replicationManager.start();
storageManager.startReplication(replicationManager, pagingManager,
getNodeID().toString(), clusterConnection,
pair);
- return true;
}
catch (Exception e)
{
@@ -2279,10 +2278,16 @@
* the backup, or (2) by an IO Error at the storage. If (1), we can swallow the
exception
* and ignore the replication request. If (2) the live will crash shortly.
*/
- // HORNETQ-720 Need to verify whether swallowing the exception here is
acceptable
log.warn("Exception when trying to start replication", e);
replicationManager = null;
- return false;
+ if (e instanceof HornetQException)
+ {
+ throw (HornetQException)e;
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error
trying to start replication", e);
+ }
}
}