Author: borges
Date: 2011-11-02 12:18:22 -0400 (Wed, 02 Nov 2011)
New Revision: 11637
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Send live's nodeID to the backup at the end of synchronization.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -42,6 +43,7 @@
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -70,9 +72,9 @@
/** Set the context back to the thread */
void setContext(OperationContext context);
-
+
/**
- *
+ *
* @param ioCriticalError is the server being stopped due to an IO critical error
*/
void stop(boolean ioCriticalError) throws Exception;
@@ -109,7 +111,7 @@
/** Confirms that a large message was finished */
void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long
recordID) throws Exception;
-
+
/** Confirms that a large message was finished */
void confirmPendingLargeMessage(long recordID) throws Exception;
@@ -157,7 +159,7 @@
* @param message This is a temporary message that holds the parsed properties.
* The remoting layer can't create a ServerMessage directly, then this will
be replaced.
* @return
- * @throws Exception
+ * @throws Exception
*/
LargeServerMessage createLargeMessage(long id, MessageInternal message) throws
Exception;
@@ -249,9 +251,14 @@
/**
* @param replicationManager
* @param pagingManager
+ * @param nodeID
+ * @param clusterConnection
+ * @param pair
* @throws Exception
*/
- void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception;
+ void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager, String nodeID,
+ ClusterConnection clusterConnection, Pair<TransportConfiguration,
TransportConfiguration> pair)
+ throws Exception;
/**
* Adds message to page if we are paging.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -42,6 +42,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.filter.Filter;
@@ -90,6 +91,7 @@
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
@@ -219,9 +221,11 @@
private boolean journalLoaded = false;
// Persisted core configuration
- private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new
ConcurrentHashMap<SimpleString, PersistedRoles>();
+ private final Map<SimpleString, PersistedRoles> mapPersistedRoles =
+ new ConcurrentHashMap<SimpleString, PersistedRoles>();
- private final Map<SimpleString, PersistedAddressSetting>
mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString,
PersistedAddressSetting>();
+ private final Map<SimpleString, PersistedAddressSetting>
mapPersistedAddressSettings =
+ new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory,
final IOCriticalErrorListener criticalErrorListener)
@@ -231,24 +235,24 @@
public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory,
final ReplicationManager replicator, final
IOCriticalErrorListener criticalErrorListener)
- {
- this.executorFactory = executorFactory;
+ {
+ this.executorFactory = executorFactory;
- executor = executorFactory.getExecutor();
+ executor = executorFactory.getExecutor();
- this.replicator = replicator;
+ this.replicator = replicator;
- if (config.getJournalType() != JournalType.NIO && config.getJournalType()
!= JournalType.ASYNCIO)
- {
- throw new IllegalArgumentException("Only NIO and AsyncIO are
supported journals");
- }
+ if (config.getJournalType() != JournalType.NIO && config.getJournalType()
!= JournalType.ASYNCIO)
+ {
+ throw new IllegalArgumentException("Only NIO and AsyncIO are supported
journals");
+ }
- bindingsDir = config.getBindingsDirectory();
+ bindingsDir = config.getBindingsDirectory();
- if (bindingsDir == null)
- {
- throw new NullPointerException("bindings-dir is null");
- }
+ if (bindingsDir == null)
+ {
+ throw new NullPointerException("bindings-dir is null");
+ }
createBindingsDir = config.isCreateBindingsDir();
@@ -306,11 +310,11 @@
config.getJournalBufferTimeout_NIO(),
config.isLogJournalWriteRate(),
criticalErrorListener);
- }
- else
- {
- throw new IllegalArgumentException("Unsupported journal type "
+ config.getJournalType());
- }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported journal type " +
config.getJournalType());
+ }
idGenerator = new BatchingIDGenerator(0,
JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -323,22 +327,20 @@
config.getJournalType() ==
JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
: config.getJournalMaxIO_NIO());
- if (replicator != null)
- {
- messageJournal = new ReplicatedJournal((byte)1, localMessage,
replicator);
- }
- else
- {
- messageJournal = localMessage;
- }
+ if (replicator != null)
+ {
+ messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ }
+ else
+ {
+ messageJournal = localMessage;
+ }
- largeMessagesDirectory = config.getLargeMessagesDirectory();
+ largeMessagesDirectory = config.getLargeMessagesDirectory();
+ largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false,
criticalErrorListener);
+ perfBlastPages = config.getJournalPerfBlastPages();
+ }
- largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory,
false, criticalErrorListener);
-
- perfBlastPages = config.getJournalPerfBlastPages();
- }
-
public void clearContext()
{
OperationContextImpl.clearContext();
@@ -355,7 +357,9 @@
* @throws HornetQException
*/
@Override
- public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
+ public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager, String nodeID,
+ ClusterConnection clusterConnection, Pair<TransportConfiguration,
TransportConfiguration> pair)
+ throws Exception
{
if (!started)
{
@@ -418,7 +422,8 @@
storageManagerLock.writeLock().lock();
try
{
- replicator.sendSynchronizationDone();
+ replicator.sendSynchronizationDone(nodeID);
+ clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, pair,
true);
// XXX HORNETQ-720 SEND a compare journals message?
}
finally
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -47,6 +48,7 @@
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -580,16 +582,10 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
- */
public void confirmPendingLargeMessage(long recordID) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
- */
public void stop(boolean ioCriticalError) throws Exception
{
}
@@ -607,7 +603,8 @@
}
@Override
- public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
+ public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager, String nodeID,
+ ClusterConnection clusterConnection, Pair<TransportConfiguration,
TransportConfiguration> pair) throws Exception
{
// no-op
}
@@ -621,5 +618,4 @@
{
return false;
}
-
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -208,14 +208,9 @@
} else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
- if (server.startReplication(rc))
+ if (server.startReplication(rc, acceptorUsed.getClusterConnection(),
getPair(msg.getConnector(), true)))
{
- /*
- * HORNETQ-720 Instantiate a new server locator to call
notifyNodeUp(...)? Or send
- * a CLUSTER_TOPOLOGY(_2?) message?
- */
-
acceptorUsed.getClusterConnection().nodeAnnounced(System.currentTimeMillis(),
msg.getNodeID(),
-
getPair(msg.getConnector(), true), true);
+ // XXX if it fails, the backup should get to know it
}
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -547,7 +547,7 @@
packet = new BackupRegistrationMessage();
break;
}
- case PacketImpl.REPLICATION_START_STOP_SYNC:
+ case PacketImpl.REPLICATION_START_FINISH_SYNC:
{
packet = new ReplicationStartSyncMessage();
break;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -204,7 +204,7 @@
public static final byte BACKUP_REGISTRATION = 115;
- public static final byte REPLICATION_START_STOP_SYNC = 120;
+ public static final byte REPLICATION_START_FINISH_SYNC = 120;
// Static --------------------------------------------------------
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -16,20 +16,24 @@
private long[] ids;
private JournalContent journalType;
private boolean synchronizationIsFinished;
+ private String nodeID;
public ReplicationStartSyncMessage()
{
- super(REPLICATION_START_STOP_SYNC);
+ super(REPLICATION_START_FINISH_SYNC);
}
+ public ReplicationStartSyncMessage(String nodeID)
+ {
+ this();
+ synchronizationIsFinished = true;
+ this.nodeID = nodeID;
+ }
+
public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType)
{
this();
- if (datafiles == null && contentType == null)
- {
- synchronizationIsFinished = true;
- return;
- }
+ synchronizationIsFinished = false;
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
{
@@ -43,7 +47,10 @@
{
buffer.writeBoolean(synchronizationIsFinished);
if (synchronizationIsFinished)
+ {
+ buffer.writeString(nodeID);
return;
+ }
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -57,7 +64,10 @@
{
synchronizationIsFinished = buffer.readBoolean();
if (synchronizationIsFinished)
+ {
+ nodeID = buffer.readString();
return;
+ }
journalType = JournalContent.getType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
@@ -85,4 +95,9 @@
{
return ids;
}
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -98,9 +98,11 @@
/**
* Informs backup that data synchronization is done.
* <p>
- * So if 'live' fails, the (up-to-date) backup now may take over its duties.
+ * So if 'live' fails, the (up-to-date) backup now may take over its duties.
To do so, it must
+ * know which is the live's {@code nodeID}.
+ * @param nodeID
*/
- void sendSynchronizationDone();
+ void sendSynchronizationDone(String nodeID);
/**
* Sends the whole content of the file to be duplicated.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -88,11 +88,6 @@
private final IOCriticalErrorListener criticalErrorListener;
- private static void trace(final String msg)
- {
- ReplicationEndpointImpl.log.trace(msg);
- }
-
private final HornetQServerImpl server;
private Channel channel;
@@ -152,10 +147,6 @@
journals[id] = journal;
}
- /*
- * (non-Javadoc)
- * @see
org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
- */
public void handlePacket(final Packet packet)
{
PacketImpl response = new ReplicationResponseMessage();
@@ -212,7 +203,7 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (type == PacketImpl.REPLICATION_START_STOP_SYNC)
+ else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
{
handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
@@ -425,7 +416,7 @@
// Private -------------------------------------------------------
- private void finishSynchronization() throws Exception
+ private void finishSynchronization(String nodeID) throws Exception
{
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
@@ -486,7 +477,7 @@
}
}
journalsHolder = null;
- server.setRemoteBackupUpToDate();
+ server.setRemoteBackupUpToDate(nodeID);
log.info("Backup server " + server + " is synchronized with
live-server.");
return;
}
@@ -565,7 +556,7 @@
if (packet.isSynchronizationFinished())
{
- finishSynchronization();
+ finishSynchronization(packet.getNodeID());
return;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -480,7 +480,6 @@
private static class NullEncoding implements EncodingSupport
{
-
static NullEncoding instance = new NullEncoding();
public void decode(final HornetQBuffer buffer)
@@ -495,7 +494,6 @@
{
return 0;
}
-
}
@Override
@@ -528,8 +526,8 @@
@Override
public void syncPages(SequentialFile file, long id, SimpleString queueName) throws
Exception
{
- if (enabled)
- sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+ if (enabled)
+ sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
/**
@@ -541,10 +539,7 @@
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
- private void sendLargeFile(JournalContent content,
- SimpleString pageStore,
- final long id,
- SequentialFile file,
+ private void sendLargeFile(JournalContent content, SimpleString pageStore, final long
id, SequentialFile file,
long maxBytesToSend) throws Exception
{
if (!enabled)
@@ -564,23 +559,23 @@
int toSend = bytesRead;
if (bytesRead > 0)
{
- if (bytesRead >= maxBytesToSend)
- {
- toSend = (int)maxBytesToSend;
- maxBytesToSend = 0;
+ if (bytesRead >= maxBytesToSend)
+ {
+ toSend = (int)maxBytesToSend;
+ maxBytesToSend = 0;
+ }
+ else
+ {
+ maxBytesToSend = maxBytesToSend - bytesRead;
+ }
+ buffer.limit(toSend);
}
- else
- {
- maxBytesToSend = maxBytesToSend - bytesRead;
- }
- buffer.limit(toSend);
- }
- buffer.rewind();
+ buffer.rewind();
- // sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
bytesRead, buffer));
- if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
- break;
+ // sending -1 or 0 bytes will close the file at the backup
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
bytesRead, buffer));
+ if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+ break;
}
}
finally
@@ -597,9 +592,9 @@
}
@Override
- public void sendSynchronizationDone()
+ public void sendSynchronizationDone(String nodeID)
{
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(null, null));
+ sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
}
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-02
16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -20,7 +20,9 @@
import javax.management.MBeanServer;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
@@ -33,6 +35,7 @@
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.security.Role;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.impl.ConnectorsService;
@@ -57,7 +60,7 @@
public interface HornetQServer extends HornetQComponent
{
- /** This method was created mainly for testing but it may be used in scenarios where
+ /** This method was created mainly for testing but it may be used in scenarios where
* you need to have more than one Server inside the same VM.
* This identity will be exposed on logs what may help you to debug issues on the log
traces and debugs.*/
void setIdentity(String identity);
@@ -65,7 +68,7 @@
String getIdentity();
String describe();
-
+
Configuration getConfiguration();
RemotingService getRemotingService();
@@ -113,7 +116,7 @@
Set<ServerSession> getSessions();
boolean isStarted();
-
+
boolean isStopped();
HierarchicalRepository<Set<Role>> getSecurityRepository();
@@ -182,14 +185,17 @@
void destroyBridge(String name) throws Exception;
ServerSession getSessionByID(String sessionID);
-
+
void threadDump(String reason);
void stop(boolean failoverOnServerShutdown) throws Exception;
/**
* @param rc
+ * @param pair
+ * @param clusterConnection
* @return {@code true} if replication started successfully, {@code false} otherwise
*/
- boolean startReplication(CoreRemotingConnection rc);
+ boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration> pair);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -69,6 +69,11 @@
return uuid;
}
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = new SimpleString(nodeID);
+ }
+
public abstract boolean isAwaitingFailback() throws Exception;
public abstract boolean isBackupLive() throws Exception;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -109,6 +109,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -473,7 +474,7 @@
stop(failoverOnServerShutdown, false);
}
- protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws
Exception
+ private void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws
Exception
{
synchronized (this)
{
@@ -584,7 +585,7 @@
for (Runnable task : tasks)
{
- HornetQServerImpl.log.debug(this + "::Waiting for " + task);
+ HornetQServerImpl.log.info(this + "::Waiting for " + task);
}
if (memoryManager != null)
@@ -592,9 +593,9 @@
memoryManager.stop();
}
- threadPool.shutdown();
+ threadPool.shutdown();
- scheduledPool.shutdown();
+ scheduledPool.shutdown();
try
{
@@ -1222,7 +1223,7 @@
// null);
// }
- protected PagingManager createPagingManager()
+ private PagingManager createPagingManager()
{
return new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(),
@@ -1238,7 +1239,7 @@
/**
* This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
*/
- protected StorageManager createStorageManager()
+ private StorageManager createStorageManager()
{
if (configuration.isPersistenceEnabled())
{
@@ -2086,7 +2087,7 @@
final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator,
nodeManager.getNodeId().toString());
+ final QuorumManager quorumManager = new QuorumManager(serverLocator);
serverLocator.setReconnectAttempts(-1);
@@ -2127,10 +2128,11 @@
while (true)
{
nodeManager.awaitLiveNode();
- if (!started || quorumManager.isNodeDown())
- {
- break;
- }
+ break;
+// if (!started || quorumManager.isNodeDown())
+// {
+// break;
+// }
}
serverLocator.close();
@@ -2278,13 +2280,20 @@
@Override
- public boolean startReplication(CoreRemotingConnection rc)
+ public boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration> pair)
{
+ if (replicationManager != null)
+ {
+ return false;
+ }
+
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
try
{
replicationManager.start();
- storageManager.startReplication(replicationManager, pagingManager);
+ storageManager.startReplication(replicationManager, pagingManager,
getNodeID().toString(), clusterConnection,
+ pair);
return true;
}
catch (Exception e)
@@ -2313,8 +2322,9 @@
return backupUpToDate;
}
- public void setRemoteBackupUpToDate()
+ public void setRemoteBackupUpToDate(String nodeID)
{
+ nodeManager.setNodeID(nodeID);
backupUpToDate = true;
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -32,16 +32,14 @@
// volatile boolean started;
private final ServerLocator locator;
- private final String targetServerName;
+ private final String targetServerName = "";
private final Map<String, Pair<TransportConfiguration,
TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration,
TransportConfiguration>>();
private static final long DISCOVERY_TIMEOUT = 3;
- public QuorumManager(ServerLocator serverLocator, String nodeID)
+ public QuorumManager(ServerLocator serverLocator)
{
this.locator = serverLocator;
- this.targetServerName = nodeID;
-
locator.addClusterTopologyListener(this);
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-11-02
16:17:48 UTC (rev 11636)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-11-02
16:18:22 UTC (rev 11637)
@@ -140,7 +140,7 @@
deliver();
}
- if (packet.getType() == PacketImpl.REPLICATION_START_STOP_SYNC &&
mustHold)
+ if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC &&
mustHold)
{
ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
if (syncMsg.isSynchronizationFinished() && !deliver)