Author: borges
Date: 2011-09-08 12:13:30 -0400 (Thu, 08 Sep 2011)
New Revision: 11308
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 (not final) Assume that a problem during the start of replication was on the
backup side, as an actual IO_ERROR would also cause all sorts of trouble on the live
itself
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -34,6 +34,7 @@
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -239,4 +240,11 @@
*/
Journal getMessageJournal();
+ /**
+ * @param replicationManager
+ * @param pagingManager
+ * @throws Exception
+ */
+ void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception;
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -351,6 +351,7 @@
* @param pagingManager
* @throws HornetQException
*/
+ @Override
public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
{
if (!started)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -588,4 +588,10 @@
return null;
}
+ @Override
+ public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
+ {
+ // no-op
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -30,8 +30,8 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
-import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -149,17 +149,11 @@
else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
- try
+ if (server.startReplication(rc))
{
- server.addHaBackup(rc);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), true), true,
+ true);
}
- catch (Exception e)
- {
- // XXX HORNETQ-720 This is not what we want
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), true), true, true);
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -172,5 +172,9 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- void addHaBackup(CoreRemotingConnection rc) throws Exception;
+ /**
+ * @param rc
+ * @return {@code true} if replication started successfully, {@code false} otherwise
+ */
+ boolean startReplication(CoreRemotingConnection rc);
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08
16:12:41 UTC (rev 11307)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08
16:13:30 UTC (rev 11308)
@@ -1992,18 +1992,27 @@
}
@Override
- public void addHaBackup(CoreRemotingConnection rc) throws Exception
+ public boolean startReplication(CoreRemotingConnection rc)
{
- if (!(storageManager instanceof JournalStorageManager))
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
+ try
{
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "unknown
implementation of JournalStorageManager!");
+ replicationManager.start();
+ storageManager.startReplication(replicationManager, pagingManager);
+ return true;
}
-
- JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
- replicationManager = new ReplicationManagerImpl(rc, executorFactory);
- replicationManager.start();
-
- journalStorageManager.startReplication(replicationManager, pagingManager);
+ catch (Exception e)
+ {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the
(interaction with)
+ * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the
exception
+ * and ignore the replication request. If (2) the live will crash shortly.
+ */
+ // HORNETQ-720 Need to verify whether swallowing the exception here is
acceptable
+ log.warn("Exception when trying to start replication", e);
+ replicationManager = null;
+ return false;
+ }
}
/**