Author: borges
Date: 2011-09-08 11:06:11 -0400 (Thu, 08 Sep 2011)
New Revision: 11304
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Clean up of code related to the case (see "HORNETQ-720 XXX" problem
markers)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -76,7 +76,7 @@
private final StaticConnector staticConnector = new StaticConnector();
- private Topology topology = new Topology();
+ private final Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -1270,21 +1270,6 @@
}
}
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topology = null;
-
- }
- }
-
public Topology getTopology()
{
return topology;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -31,8 +31,6 @@
{
void start(Executor executor) throws Exception;
- void factoryClosed(final ClientSessionFactory factory);
-
void setNodeID(String nodeID);
String getNodeID();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -347,7 +347,6 @@
}
/**
- * XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
* @param replicationManager
* @param pagingManager
* @throws HornetQException
@@ -703,8 +702,6 @@
try
{
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
-
- // XXX HORNETQ-720
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -31,7 +31,7 @@
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -146,9 +146,9 @@
server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), backup), backup,
true);
}
- else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
- HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
try
{
server.addHaBackup(rc);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -90,7 +90,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -521,9 +521,9 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
- case PacketImpl.HA_BACKUP_REGISTRATION:
+ case PacketImpl.BACKUP_REGISTRATION:
{
- packet = new HaBackupRegistrationMessage();
+ packet = new BackupRegistrationMessage();
break;
}
case PacketImpl.REPLICATION_START_STOP_SYNC:
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -192,8 +192,7 @@
public static final byte SUBSCRIBE_TOPOLOGY = 112;
- /** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
- public static final byte HA_BACKUP_REGISTRATION = 113;
+ public static final byte BACKUP_REGISTRATION = 113;
public static final byte REPLICATION_START_STOP_SYNC = 120;
Copied:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
(from rev 11303,
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java)
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the
new backup
+ * node.
+ */
+public class BackupRegistrationMessage extends PacketImpl
+{
+
+ private TransportConfiguration connector;
+
+ private String nodeID;
+
+ public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ {
+ this();
+ connector = tc;
+ nodeID = nodeId;
+ }
+
+ public BackupRegistrationMessage()
+ {
+ super(BACKUP_REGISTRATION);
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readString();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+}
Deleted:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -1,60 +0,0 @@
-/**
- *
- */
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Registers a backup node with its live server.
- * <p>
- * After registration the live server will initiate synchronization of its state with the
new backup
- * node.
- */
-public class HaBackupRegistrationMessage extends PacketImpl
-{
-
- private TransportConfiguration connector;
-
- private String nodeID;
-
- public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
- {
- this();
- connector = tc;
- nodeID = nodeId;
- }
-
- public HaBackupRegistrationMessage()
- {
- super(HA_BACKUP_REGISTRATION);
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public TransportConfiguration getConnector()
- {
- return connector;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeString(nodeID);
- connector.encode(buffer);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- nodeID = buffer.readString();
- connector = new TransportConfiguration();
- connector.decode(buffer);
- }
-
-}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -523,26 +523,14 @@
return;
}
- final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
+ final Journal journal = journalsHolder.get(packet.getJournalContentType());
- JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
- JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(),
mapToFill);
+ JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(),
mapToFill);
registerJournal(packet.getJournalContentType().typeByte,
new FileWrapperJournal(current, storage.hasCallbackSupport()));
}
- // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
- private static JournalImpl assertJournalImpl(final Journal journalIf) throws
HornetQException
- {
- if (!(journalIf instanceof JournalImpl))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "Journals of backup server are expected to be
JournalImpl");
- }
- return (JournalImpl)journalIf;
- }
-
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -577,7 +577,6 @@
@Override
public void sendSynchronizationDone()
{
- ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
- sendReplicatePacket(msg);
+ sendReplicatePacket(new ReplicationStartSyncMessage(null, null));
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -45,7 +45,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -480,7 +480,7 @@
"'. backup cannot be announced.");
return;
}
- liveChannel.send(new HaBackupRegistrationMessage(nodeUUID.toString(),
connector));
+ liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(),
connector));
}
else
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -570,7 +570,6 @@
Channel pingChannel =
liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- replicationChannel.setHandler(replicationEndpoint);
connectToReplicationEndpoint(replicationChannel);
replicationEndpoint.start();
@@ -1064,10 +1063,6 @@
return session;
}
- /**
- * XXX FIXME to be made private, and method removed from Server interface once
HORNETQ-720 is
- * finished.
- */
private synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel
channel) throws Exception
{
if (!configuration.isBackup())
@@ -1076,8 +1071,7 @@
getIdentity());
}
- if (replicationEndpoint == null)
- System.err.println("endpoint is null!");
+ channel.setHandler(replicationEndpoint);
if (replicationEndpoint.getChannel() != null)
{
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -14,7 +14,9 @@
package org.hornetq.core.journal;
import java.util.List;
+import java.util.Map;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -139,4 +141,19 @@
void runDirectJournalBlast() throws Exception;
+ /**
+ * Reserves journal file IDs, creates the necessary files for synchronization, and
places
+ * references to these (reserved for sync) files in the map.
+ * <p>
+ * During the synchronization between a live server and backup, we reserve in the
backup the
+ * journal file IDs used in the live server. This call also makes sure the files are
created
+ * empty without any kind of headers added.
+ * @param fileIds ids to reserve for synchronization
+ * @param mapToFill map to be filled with id and journal file pairs for
<b>synchronization</b>.
+ * @return a new {@link JournalFile} to be used for regular
<b>replication</b> during
+ * synchronization
+ * @throws Exception
+ */
+ JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile>
mapToFill) throws Exception;
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -3106,7 +3106,8 @@
* @return
* @throws Exception
*/
- public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long,
JournalFile> map) throws Exception
+ @Override
+ public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long,
JournalFile> map) throws Exception
{
writeLock();
try
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-07
16:10:18 UTC (rev 11303)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-08
15:06:11 UTC (rev 11304)
@@ -58,7 +58,7 @@
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
{
- if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
try
{