JBoss hornetq SVN: r11056 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:28:44 -0400 (Wed, 27 Jul 2011)
New Revision: 11056
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Remove unnecessary "throws Exception" declaration.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -493,7 +493,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#perfBlast(int)
*/
- public void perfBlast(final int pages) throws Exception
+ public void perfBlast(final int pages)
{
localJournal.perfBlast(pages);
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -125,10 +125,10 @@
int getAlignment() throws Exception;
int getNumberOfRecords();
-
+
int getUserVersion();
- void perfBlast(int pages) throws Exception;
+ void perfBlast(int pages);
void runDirectJournalBlast() throws Exception;
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -19,12 +19,12 @@
import org.hornetq.core.journal.impl.TimedBuffer;
/**
- *
+ *
* A SequentialFile
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
+ *
*/
public interface SequentialFile
{
@@ -69,7 +69,7 @@
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
-
+
/** Write directly to the file.
* This is used by compacting and other places where we write a big buffer in a single shot.
* writeInternal should always block until the entire write is sync on disk */
@@ -94,7 +94,7 @@
void renameTo(String newFileName) throws Exception;
SequentialFile copy();
-
+
void copyTo(SequentialFile newFileName) throws Exception;
void setTimedBuffer(TimedBuffer buffer);
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -2395,7 +2395,7 @@
/** Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
- public void debugWait() throws Exception
+ public void debugWait() throws InterruptedException
{
fileFactory.flush();
@@ -2501,7 +2501,7 @@
}
}
- public void perfBlast(final int pages) throws Exception
+ public void perfBlast(final int pages)
{
new PerfBlast(pages).start();
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -149,7 +149,7 @@
}
/**
- *
+ *
*/
public void clear()
{
@@ -246,7 +246,7 @@
neg.add(new JournalUpdate(file, id, 0));
}
- /**
+ /**
* The caller of this method needs to guarantee appendLock.lock at the journal. (unless this is being called from load what is a single thread process).
* */
public void commit(final JournalFile file)
@@ -316,7 +316,7 @@
}
}
- public void waitCallbacks() throws Exception
+ public void waitCallbacks() throws InterruptedException
{
if (callbackList != null)
{
@@ -336,7 +336,7 @@
}
}
- /**
+ /**
* The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
* or else potFilesMap could be affected
* */
@@ -367,7 +367,7 @@
}
}
- /**
+ /**
* The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
* or else potFilesMap could be affected
* */
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -51,7 +51,6 @@
assertFalse("backup is not started!", backupServer.isStarted());
// BLOCK ON journals
- // SYNC, (UNSET reclaim, lock, use next file, get file list, sync) iterate
backupServer.start();
waitForBackup(sessionFactory, 5);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-27 17:27:23 UTC (rev 11055)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-27 17:28:44 UTC (rev 11056)
@@ -726,7 +726,7 @@
return new JournalLoadInformation();
}
- public void perfBlast(final int pages) throws Exception
+ public void perfBlast(final int pages)
{
}
12 years, 9 months
JBoss hornetq SVN: r11055 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 5 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:27:23 -0400 (Wed, 27 Jul 2011)
New Revision: 11055
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Some replication synchronization code (turned off as it does not work).
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -14,6 +14,7 @@
package org.hornetq.core.persistence.impl.journal;
import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.security.AccessController;
@@ -157,6 +158,11 @@
private ReplicationManager replicator;
+ public enum JournalContent
+ {
+ MESSAGES, BINDINGS;
+ }
+
private Journal messageJournal;
private Journal bindingsJournal;
@@ -321,17 +327,76 @@
return replicator != null;
}
- public void setReplicator(ReplicationManager replicationManager)
+ /**
+ * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
+ * <p>
+ * XXX A second version improvement would be to allow new operations to be sent to the backup,
+ * while we synchronize the existing logs.
+ * @param replicationManager
+ * @throws HornetQException
+ */
+ public void setReplicator(ReplicationManager replicationManager) throws Exception
{
assert replicationManager != null;
replicator = replicationManager;
- Journal localMessageJournal = messageJournal;
- Journal localBindingsJournal = bindingsJournal;
+
+ if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "journals here are not JournalImpl. You can't set a replicator!");
+ }
+ JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ if (false)
+ {
+ localMessageJournal.writeLock();
+ localBindingsJournal.writeLock();
+
+ JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
+ JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+
+ sendJournalFile(messageFiles, JournalContent.MESSAGES);
+ sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ }
+ // XXX NEED to take a global lock on the StorageManager.
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
- // XXX HORNETQ-720 obviously missing here is the synchronization step.
}
+ /**
+ * Send an entire journal file to a replicating server (a backup server that is).
+ * @param jf
+ * @param replicator2
+ * @throws IOException
+ * @throws HornetQException
+ */
+ private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws IOException, HornetQException
+ {
+ for (JournalFile jf : journalFiles)
+ {
+ replicator.sendJournalFile(jf, type);
+ jf.setCanReclaim(true);
+ }
+ }
+
+ private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+ {
+ journal.setAutoReclaim(false);
+ /*
+ * need to check whether it is safe to proceed if compacting is running (specially at the end
+ * of it)
+ */
+ journal.forceMoveNextFile();
+ JournalFile[] datafiles = journal.getDataFiles();
+ for (JournalFile jf : datafiles)
+ {
+ jf.setCanReclaim(false);
+ }
+ return datafiles;
+ }
+
public void waitOnOperations() throws Exception
{
if (!started)
Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -0,0 +1,29 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFile extends PacketImpl
+{
+
+ private byte[] data;
+ private int dataSize;
+ private JournalContent journalType;
+
+ public ReplicationJournalFile()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ public ReplicationJournalFile(int size, byte[] data, JournalContent content)
+ {
+ this();
+ this.dataSize = size;
+ this.data = data;
+ this.journalType = content;
+ }
+
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -13,20 +13,23 @@
package org.hornetq.core.replication;
+import java.io.IOException;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.server.HornetQComponent;
/**
* Used by the {@link JournalStorageManager} to update the replicated journal.
- *
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public interface ReplicationManager extends HornetQComponent
@@ -84,4 +87,10 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+ /**
+ * Sends the whole content of the file to be duplicated.
+ * @throws HornetQException
+ */
+ void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication.impl;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -24,9 +26,11 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
@@ -40,6 +44,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -463,8 +468,10 @@
private class ResponseHandler implements ChannelHandler
{
- /* (non-Javadoc)
- * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
public void handlePacket(final Packet packet)
{
@@ -496,4 +503,19 @@
}
+ @Override
+ public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
+ {
+ FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+ byte[] data = new byte[1 << 17]; // about 130 kB
+ while (true)
+ {
+ int bytesRead = file.read(data);
+ if (bytesRead == -1)
+ break;
+ replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data, content));
+ }
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -2003,10 +2003,12 @@
public void addHaBackup(CoreRemotingConnection rc) throws Exception
{
if (!(storageManager instanceof JournalStorageManager))
- return;
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "unknown implementation of JournalStorageManager!");
+ }
+
JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
-
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -18,11 +18,11 @@
import org.hornetq.core.server.HornetQComponent;
/**
- *
+ *
* Most methods on the journal provide a blocking version where you select the sync mode and a non blocking mode where you pass a completion callback as a parameter.
- *
+ *
* Notice also that even on the callback methods it's possible to pass the sync mode. That will only make sense on the NIO operations.
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -84,13 +84,13 @@
*/
void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception;
- /**
- *
- * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
* back to a state it could be committed. </p>
- *
+ *
* <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
- *
+ *
* @param txID
* @param transactionData - extra user data for the prepare
* @throws Exception
@@ -115,7 +115,7 @@
* This is only useful if you're using the journal but not interested on the current data.
* Useful in situations where the journal is being replicated, copied... etc. */
JournalLoadInformation loadInternalOnly() throws Exception;
-
+
void lineUpContex(IOCompletion callback);
JournalLoadInformation load(List<RecordInfo> committedRecords,
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -51,12 +51,9 @@
void setAutoReclaim(boolean autoReclaim);
- boolean isAutoReclaim();
-
void testCompact() throws Exception;
JournalFile getCurrentFile();
-
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -104,16 +104,16 @@
file.delete();
}
-
+
public void copyTo(SequentialFile newFileName) throws Exception
{
log.debug("Copying " + this + " as " + newFileName);
newFileName.open();
this.open();
-
-
+
+
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-
+
for (;;)
{
buffer.rewind();
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -321,10 +321,10 @@
return openedFiles.size();
}
- /**
+ /**
* <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
* <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
* */
public JournalFile openFile() throws InterruptedException
{
@@ -377,8 +377,8 @@
return nextFile;
}
- /**
- *
+ /**
+ *
* Open a file and place it into the openedFiles queue
* */
public void pushOpenedFile() throws Exception
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27 17:27:23 UTC (rev 11055)
@@ -65,15 +65,11 @@
import org.hornetq.utils.DataConstants;
/**
- *
- * <p>A circular log implementation.</p
- *
- * <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
- *
+ * A circular log implementation.
+ * <p>
+ * Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
*/
public class JournalImpl implements TestableJournal, JournalRecordProvider
{
@@ -1824,7 +1820,7 @@
compactor = null;
}
- autoReclaim = previousReclaimValue;
+ setAutoReclaim(previousReclaimValue);
}
}
@@ -2352,11 +2348,6 @@
this.autoReclaim = autoReclaim;
}
- public boolean isAutoReclaim()
- {
- return autoReclaim;
- }
-
public String debug() throws Exception
{
reclaimer.scan(getDataFiles());
@@ -3266,4 +3257,13 @@
}
}
+ public void writeLock()
+ {
+ journalLock.writeLock().lock();
+ }
+
+ public void writeUnlock()
+ {
+ journalLock.writeLock().unlock();
+ }
}
12 years, 9 months
JBoss hornetq SVN: r11054 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:25:49 -0400 (Wed, 27 Jul 2011)
New Revision: 11054
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -370,9 +370,6 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
- */
public void compareJournals(final JournalLoadInformation[] journalInfo) throws HornetQException
{
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -153,7 +153,7 @@
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
- // ---------------------------------------------------------------------------------------
+ // -----------------------------------------------------------------------------------
// Attributes
// -----------------------------------------------------------------------------------
@@ -2006,6 +2006,7 @@
return;
JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
+
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -54,17 +54,7 @@
@Override
protected void tearDown() throws Exception
{
- if (locator != null)
- {
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- //
- }
- }
+ closeServerLocator(locator);
super.tearDown();
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -102,7 +102,7 @@
protected void tearDown() throws Exception
{
closeSessionFactory();
- locator.close();
+ closeServerLocator(locator);
super.tearDown();
}
@@ -215,7 +215,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -346,7 +346,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -390,7 +390,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -442,7 +442,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.commit();
@@ -484,7 +484,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -524,7 +524,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -652,7 +652,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -696,7 +696,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -746,7 +746,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -790,7 +790,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -831,7 +831,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -877,7 +877,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -925,7 +925,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -1067,7 +1067,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
@@ -1093,11 +1093,13 @@
closeSessionFactory();
}
- private void sendMessages(ClientSession session, ClientProducer producer) throws Exception, HornetQException
+ private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception, HornetQException
{
for (int i = 0; i < NUM_MESSAGES; i++)
{
- ClientMessage message = session.createMessage(isDurable(i));
+ // some are durable, some are not!
+ boolean durable = isDurable(i);
+ ClientMessage message = session.createMessage(durable);
setBody(i, message);
message.putIntProperty("counter", i);
producer.send(message);
@@ -1114,7 +1116,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1179,7 +1181,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1203,20 +1205,8 @@
}
// Should get the same ones after failover since we didn't ack
+ receiveMessagesAndAck(consumer, NUM_MESSAGES, NUM_MESSAGES * 2);
- for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
closeSessionFactory();
@@ -1224,14 +1214,7 @@
private void receiveMessages(ClientConsumer consumer) throws HornetQException
{
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
- assertMessageBody(i, message);
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- message.acknowledge();
- }
+ receiveMessagesAndAck(consumer, 0, NUM_MESSAGES);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1281,7 +1264,7 @@
crash(session);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
receiveMessages(consumer);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -182,8 +182,8 @@
@Override
protected void tearDown() throws Exception
{
- backupServer.stop();
- liveServer.stop();
+ stopComponent(backupServer);
+ stopComponent(liveServer);
Assert.assertEquals(0, InVMRegistry.instance.size());
@@ -230,11 +230,11 @@
return sf;
}
- protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
+ protected static void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
{
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (sf.getBackupConnector() == null)
+ final long toWait = seconds * 1000;
+ final long time = System.currentTimeMillis();
+ while (sessionFactory.getBackupConnector() == null)
{
try
{
@@ -244,7 +244,7 @@
{
//ignore
}
- if (sf.getBackupConnector() != null)
+ if (sessionFactory.getBackupConnector() != null)
{
break;
}
@@ -253,7 +253,7 @@
fail("backup server never started");
}
}
- System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
+ System.out.println("sf.getBackupConnector() = " + sessionFactory.getBackupConnector());
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
/**
@@ -24,7 +25,7 @@
*
*
*/
-public interface TestableServer
+public interface TestableServer extends HornetQComponent
{
HornetQServer getServer();
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-27 17:25:49 UTC (rev 11054)
@@ -24,7 +24,9 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
@@ -33,7 +35,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -70,14 +71,7 @@
{
for (ServerLocator locator : locators)
{
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ closeServerLocator(locator);
}
locators.clear();
super.tearDown();
@@ -90,6 +84,20 @@
}
}
+ public static final void closeServerLocator(ServerLocator locator)
+ {
+ if (locator == null)
+ return;
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
protected final static void waitForComponent(final HornetQComponent component, final long seconds) throws Exception
{
long time = System.currentTimeMillis();
@@ -111,12 +119,19 @@
}
}
- protected final void stopComponent(HornetQComponent component) throws Exception
+ protected static final void stopComponent(HornetQComponent component)
{
if (component == null)
return;
if (component.isStarted())
- component.stop();
+ try
+ {
+ component.stop();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
}
protected static Map<String, Object> generateParams(final int node, final boolean netty)
@@ -166,9 +181,6 @@
return new TransportConfiguration(className, params);
}
- // Static --------------------------------------------------------
- private final Logger log = Logger.getLogger(this.getClass());
-
// Constructors --------------------------------------------------
public ServiceTestBase()
@@ -507,7 +519,7 @@
* @param numMessages
* @throws Exception
*/
- public void sendMessages(ClientSession session, ClientProducer producer, int numMessages) throws Exception
+ public final void sendMessages(ClientSession session, ClientProducer producer, int numMessages) throws Exception
{
for (int i = 0; i < numMessages; i++)
{
@@ -519,6 +531,19 @@
}
+ protected final
+ void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount) throws HornetQException
+ {
+ for (int i = start; i < msgCount; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ message.acknowledge();
+ }
+ }
+
/**
* Deleting a file on LargeDire is an asynchronous process. We need to keep looking for a while
* if the file hasn't been deleted yet.
12 years, 9 months
JBoss hornetq SVN: r11053 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:24:19 -0400 (Wed, 27 Jul 2011)
New Revision: 11053
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Test for replicated backup sync
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-27 17:24:19 UTC (rev 11053)
@@ -0,0 +1,102 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupJournalSyncTest extends FailoverTestBase
+{
+
+ private ServerLocatorInternal locator;
+ private ClientSessionFactoryInternal sessionFactory;
+ private ClientSession session;
+ private static final int N_MSGS = 100;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ }
+
+ public void testNodeID() throws Exception
+ {
+ backupServer.start();
+ waitForComponent(backupServer, 5);
+ assertTrue("must be running", backupServer.isStarted());
+ assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
+ backupServer.getServer().getNodeID());
+ }
+
+ public void testMessageSync() throws Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ sendMessages(session, producer, N_MSGS);
+ session.start();
+
+ receiveMsgs(0, N_MSGS / 2);
+ assertFalse("backup is not started!", backupServer.isStarted());
+
+ // BLOCK ON journals
+ // SYNC, (UNSET reclaim, lock, use next file, get file list, sync) iterate
+ backupServer.start();
+
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+
+
+ // consume N/2 from 'new' live (the old backup)
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ session.commit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sessionFactory != null)
+ sessionFactory.close();
+ if (session != null)
+ session.close();
+ closeServerLocator(locator);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27 17:23:37 UTC (rev 11052)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27 17:24:19 UTC (rev 11053)
@@ -1120,6 +1120,7 @@
session.start();
+ // Receive MSGs but don't ack!
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27 17:23:37 UTC (rev 11052)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27 17:24:19 UTC (rev 11053)
@@ -71,6 +71,8 @@
protected NodeManager nodeManager;
+ protected boolean startBackupServer = true;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -102,7 +104,7 @@
liveServer.start();
- if (backupServer != null)
+ if (backupServer != null && startBackupServer)
{
backupServer.start();
}
@@ -170,7 +172,6 @@
backupServer = createBackupServer();
backupServer.getServer().setIdentity("idBackup");
-
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
@@ -215,8 +216,8 @@
}
}
- protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
- throws Exception
+ protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int topologyMembers) throws Exception
{
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
12 years, 9 months
JBoss hornetq SVN: r11052 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests: util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:23:37 -0400 (Wed, 27 Jul 2011)
New Revision: 11052
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java
Log:
Reduce code duplication.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-27 16:36:09 UTC (rev 11051)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-27 17:23:37 UTC (rev 11052)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
@@ -28,10 +27,10 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.DelegatingSession;
import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
import org.hornetq.tests.util.TransportConfigurationUtils;
/**
@@ -47,26 +46,12 @@
{
private static final Logger log = Logger.getLogger(AsynchronousFailoverTest.class);
- private volatile MyListener listener;
+ private volatile CountDownSessionFailureListener listener;
private volatile ClientSessionFactoryInternal sf;
private final Object lockFail = new Object();
- class MyListener implements SessionFailureListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(final HornetQException me)
- {
- }
- }
-
public void testNonTransactional() throws Throwable
{
runTest(new TestRunner()
@@ -256,12 +241,10 @@
ClientSession session = sf.createSession(true, true, 0);
- MyListener listener = new MyListener();
+ listener = new CountDownSessionFailureListener();
session.addFailureListener(listener);
- this.listener = listener;
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 1000;
@@ -380,15 +363,14 @@
try
{
- MyListener listener = new MyListener();
- this.listener = listener;
boolean retry = false;
final int numMessages = 1000;
session = sf.createSession(false, false);
+ listener = new CountDownSessionFailureListener();
session.addFailureListener(listener);
do
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27 16:36:09 UTC (rev 11051)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27 17:23:37 UTC (rev 11052)
@@ -20,7 +20,6 @@
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -28,12 +27,12 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
import org.hornetq.tests.util.TransportConfigurationUtils;
/**
@@ -80,7 +79,7 @@
ClientSession session = sendAndConsume(sf, true);
- MyListener listener = new MyListener(latch);
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener(latch);
session.addFailureListener(listener);
@@ -104,7 +103,7 @@
final CountDownLatch latch2 = new CountDownLatch(1);
- listener = new MyListener(latch2);
+ listener = new CountDownSessionFailureListener(latch2);
session.addFailureListener(listener);
@@ -134,11 +133,10 @@
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
- CountDownLatch latch = new CountDownLatch(1);
ClientSession session = sendAndConsume(sf, true);
- MyListener listener = new MyListener(latch);
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
session.addFailureListener(listener);
@@ -148,7 +146,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue(listener.getLatch().await(5, TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -160,15 +158,13 @@
session.removeFailureListener(listener);
- CountDownLatch latch2 = new CountDownLatch(1);
+ listener = new CountDownSessionFailureListener();
- listener = new MyListener(latch2);
-
session.addFailureListener(listener);
liveServer.start();
- assertTrue(latch2.await(5, TimeUnit.SECONDS));
+ assertTrue(listener.getLatch().await(5, TimeUnit.SECONDS));
message = session.createMessage(true);
@@ -180,7 +176,7 @@
session.removeFailureListener(listener);
- listener = new MyListener(latch3);
+ listener = new CountDownSessionFailureListener(latch3);
session.addFailureListener(listener);
@@ -309,23 +305,4 @@
message.getBodyBuffer().writeString("message" + i);
}
- class MyListener implements SessionFailureListener
- {
- private final CountDownLatch latch;
-
- public MyListener(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- System.out.println("MyListener.beforeReconnect");
- }
- }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java 2011-07-27 16:36:09 UTC (rev 11051)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java 2011-07-27 17:23:37 UTC (rev 11052)
@@ -9,21 +9,31 @@
{
private final CountDownLatch latch;
+ public CountDownSessionFailureListener()
+ {
+ latch = new CountDownLatch(1);
+ }
+
public CountDownSessionFailureListener(CountDownLatch latch)
{
this.latch = latch;
}
+
@Override
public void connectionFailed(HornetQException exception, boolean failedOver)
{
latch.countDown();
}
+ public CountDownLatch getLatch()
+ {
+ return latch;
+ }
+
@Override
public void beforeReconnect(HornetQException exception)
{
- // TODO Auto-generated method stub
-
+ // No-op
}
}
12 years, 9 months
JBoss hornetq SVN: r11051 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 12:36:09 -0400 (Wed, 27 Jul 2011)
New Revision: 11051
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
Log:
reverting change
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 07:28:44 UTC (rev 11050)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 16:36:09 UTC (rev 11051)
@@ -18,7 +18,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -61,37 +60,46 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private final ConcurrentMap<String, TopologyMember> topologyMap = new ConcurrentHashMap<String, TopologyMember>();
+ private Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
private boolean debug = log.isDebugEnabled();
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
boolean replaced = false;
- TopologyMember oldMember = topologyMap.put(nodeId, member);
+ TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- log.debug(this + "::adding nodeId=" + nodeId + ", " + member.getConnector(), new Exception ("trace"));
+ log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
log.debug(describe("Before:"));
}
-
- if(oldMember == null)
+ if(currentMember == null)
{
+ topology.put(nodeId, member);
replaced = true;
}
else
{
- if(hasChanged(oldMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+ if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
{
+ currentMember.getConnector().a = member.getConnector().a;
replaced = true;
}
- if(hasChanged(oldMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+ if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
{
- oldMember.getConnector().b = member.getConnector().b;
+ currentMember.getConnector().b = member.getConnector().b;
replaced = true;
}
+
+ if(member.getConnector().a == null)
+ {
+ member.getConnector().a = currentMember.getConnector().a;
+ }
+ if(member.getConnector().b == null)
+ {
+ member.getConnector().b = currentMember.getConnector().b;
+ }
}
-
if(debug)
{
log.debug(this + "::Topology updated=" + replaced);
@@ -102,7 +110,7 @@
public synchronized boolean removeMember(String nodeId)
{
- TopologyMember member = topologyMap.remove(nodeId);
+ TopologyMember member = topology.remove(nodeId);
if (log.isDebugEnabled())
{
log.debug("XXX " + this + " removing nodeID=" + nodeId + ", result=" + member, new Exception ("trace"));
@@ -116,7 +124,7 @@
Map<String, TopologyMember> copy;
synchronized (this)
{
- copy = new HashMap<String, TopologyMember>(topologyMap);
+ copy = new HashMap<String, TopologyMember>(topology);
}
for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
{
@@ -126,23 +134,23 @@
public TopologyMember getMember(String nodeID)
{
- return topologyMap.get(nodeID);
+ return topology.get(nodeID);
}
public boolean isEmpty()
{
- return topologyMap.isEmpty();
+ return topology.isEmpty();
}
public Collection<TopologyMember> getMembers()
{
- return topologyMap.values();
+ return topology.values();
}
public int nodes()
{
int count = 0;
- for (TopologyMember member : topologyMap.values())
+ for (TopologyMember member : topology.values())
{
if (member.getConnector().a != null)
{
@@ -164,7 +172,7 @@
{
String desc = text + "\n";
- for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topologyMap).entrySet())
+ for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
}
@@ -174,13 +182,12 @@
public void clear()
{
- // TODO: place this back
- //topologyMap.clear();
+ topology.clear();
}
public int members()
{
- return topologyMap.size();
+ return topology.size();
}
private boolean hasChanged(TransportConfiguration currentConnector, TransportConfiguration connector)
@@ -190,7 +197,7 @@
public TransportConfiguration getBackupForConnector(TransportConfiguration connectorConfiguration)
{
- for (TopologyMember member : topologyMap.values())
+ for (TopologyMember member : topology.values())
{
if(member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
{
12 years, 9 months
JBoss hornetq SVN: r11050 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 03:28:44 -0400 (Wed, 27 Jul 2011)
New Revision: 11050
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 06:22:54 UTC (rev 11049)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 07:28:44 UTC (rev 11050)
@@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -60,46 +61,37 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
+ private final ConcurrentMap<String, TopologyMember> topologyMap = new ConcurrentHashMap<String, TopologyMember>();
private boolean debug = log.isDebugEnabled();
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
boolean replaced = false;
- TopologyMember currentMember = topology.get(nodeId);
+ TopologyMember oldMember = topologyMap.put(nodeId, member);
if (debug)
{
- log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
+ log.debug(this + "::adding nodeId=" + nodeId + ", " + member.getConnector(), new Exception ("trace"));
log.debug(describe("Before:"));
}
- if(currentMember == null)
+
+ if(oldMember == null)
{
- topology.put(nodeId, member);
replaced = true;
}
else
{
- if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+ if(hasChanged(oldMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
{
- currentMember.getConnector().a = member.getConnector().a;
replaced = true;
}
- if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+ if(hasChanged(oldMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
{
- currentMember.getConnector().b = member.getConnector().b;
+ oldMember.getConnector().b = member.getConnector().b;
replaced = true;
}
-
- if(member.getConnector().a == null)
- {
- member.getConnector().a = currentMember.getConnector().a;
- }
- if(member.getConnector().b == null)
- {
- member.getConnector().b = currentMember.getConnector().b;
- }
}
+
if(debug)
{
log.debug(this + "::Topology updated=" + replaced);
@@ -110,7 +102,7 @@
public synchronized boolean removeMember(String nodeId)
{
- TopologyMember member = topology.remove(nodeId);
+ TopologyMember member = topologyMap.remove(nodeId);
if (log.isDebugEnabled())
{
log.debug("XXX " + this + " removing nodeID=" + nodeId + ", result=" + member, new Exception ("trace"));
@@ -124,7 +116,7 @@
Map<String, TopologyMember> copy;
synchronized (this)
{
- copy = new HashMap<String, TopologyMember>(topology);
+ copy = new HashMap<String, TopologyMember>(topologyMap);
}
for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
{
@@ -134,23 +126,23 @@
public TopologyMember getMember(String nodeID)
{
- return topology.get(nodeID);
+ return topologyMap.get(nodeID);
}
public boolean isEmpty()
{
- return topology.isEmpty();
+ return topologyMap.isEmpty();
}
public Collection<TopologyMember> getMembers()
{
- return topology.values();
+ return topologyMap.values();
}
public int nodes()
{
int count = 0;
- for (TopologyMember member : topology.values())
+ for (TopologyMember member : topologyMap.values())
{
if (member.getConnector().a != null)
{
@@ -172,7 +164,7 @@
{
String desc = text + "\n";
- for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
+ for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topologyMap).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
}
@@ -182,12 +174,13 @@
public void clear()
{
- topology.clear();
+ // TODO: place this back
+ //topologyMap.clear();
}
public int members()
{
- return topology.size();
+ return topologyMap.size();
}
private boolean hasChanged(TransportConfiguration currentConnector, TransportConfiguration connector)
@@ -197,7 +190,7 @@
public TransportConfiguration getBackupForConnector(TransportConfiguration connectorConfiguration)
{
- for (TopologyMember member : topology.values())
+ for (TopologyMember member : topologyMap.values())
{
if(member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-27 06:22:54 UTC (rev 11049)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-27 07:28:44 UTC (rev 11050)
@@ -93,22 +93,22 @@
private volatile boolean started;
- private boolean backup;
+ private volatile boolean backup;
private final boolean clustered;
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
- private Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
- private Topology topology = new Topology(this);
+ private final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
- private Executor executor;
+ private final Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -229,10 +229,6 @@
managementService.unregisterCluster(clusterConnection.getName().toString());
}
- topologyListeners.clear();
- clusterConnections.clear();
- topology.clear();
-
}
for (Bridge bridge : bridges.values())
@@ -255,6 +251,11 @@
}
clusterLocators.clear();
started = false;
+
+ topologyListeners.clear();
+ clusterConnections.clear();
+ topology.clear();
+
}
public void notifyNodeDown(String nodeID)
12 years, 9 months
JBoss hornetq SVN: r11049 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 02:22:54 -0400 (Wed, 27 Jul 2011)
New Revision: 11049
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fixes and debug
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-27 01:57:20 UTC (rev 11048)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-27 06:22:54 UTC (rev 11049)
@@ -1412,7 +1412,7 @@
SimpleString nodeID = msg.getNodeID();
if (log.isTraceEnabled())
{
- log.trace("XXX notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
+ log.trace("XXX YYY notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
}
if (nodeID != null)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27 01:57:20 UTC (rev 11048)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27 06:22:54 UTC (rev 11049)
@@ -1098,9 +1098,18 @@
{
if (closed)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("YYY " + this + " is already closed when calling closed");
+ }
return;
}
+ if (log.isDebugEnabled())
+ {
+ log.debug("YYY " + this + " is calling close", new Exception ("trace"));
+ }
+
closing = true;
if (discoveryGroup != null)
@@ -1182,7 +1191,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY " + this + "::Notify nodeID=" + nodeID + " as being down");
+ log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
removed = topology.removeMember(nodeID);
@@ -1227,7 +1236,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY " + this + "::notifyNodeUp " + nodeID + ", connctorPair=" + connectorPair);
+ log.debug("XXX YYY NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair);
}
topology.addMember(nodeID, new TopologyMember(connectorPair));
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 01:57:20 UTC (rev 11048)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-27 06:22:54 UTC (rev 11049)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -59,7 +60,7 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+ private Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
private boolean debug = log.isDebugEnabled();
@@ -112,12 +113,12 @@
TopologyMember member = topology.remove(nodeId);
if (log.isDebugEnabled())
{
- log.debug("XXX Removing member " + member, new Exception ("trace"));
+ log.debug("XXX " + this + " removing nodeID=" + nodeId + ", result=" + member, new Exception ("trace"));
}
return (member != null);
}
- public void sendTopology(ClusterTopologyListener listener)
+ public synchronized void sendTopology(ClusterTopologyListener listener)
{
int count = 0;
Map<String, TopologyMember> copy;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 01:57:20 UTC (rev 11048)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 06:22:54 UTC (rev 11049)
@@ -412,7 +412,7 @@
if (serverLocator != null)
{
serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setIdentity(server.toString());
+ serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
serverLocator.setReconnectAttempts(0);
@@ -645,7 +645,7 @@
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
- targetLocator.setIdentity("Cluster-connection-bridge on ClusterConnectionImpl=" + this.toString());
+ targetLocator.setIdentity("(Cluster-connection-bridge::" + this.toString() + ")");
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
12 years, 9 months
JBoss hornetq SVN: r11048 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-26 21:57:20 -0400 (Tue, 26 Jul 2011)
New Revision: 11048
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
Log:
fix
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-27 01:26:35 UTC (rev 11047)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-27 01:57:20 UTC (rev 11048)
@@ -1412,7 +1412,7 @@
SimpleString nodeID = msg.getNodeID();
if (log.isTraceEnabled())
{
- log.trace("XXX notify nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
+ log.trace("XXX notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
}
if (nodeID != null)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27 01:26:35 UTC (rev 11047)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27 01:57:20 UTC (rev 11048)
@@ -65,6 +65,8 @@
private boolean finalizeCheck = true;
private boolean clusterConnection;
+
+ private String identity;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
@@ -1035,6 +1037,11 @@
throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
}
}
+
+ public void setIdentity(String identity)
+ {
+ this.identity = identity;
+ }
public void setNodeID(String nodeID)
{
@@ -1168,14 +1175,14 @@
{
if (log.isDebugEnabled())
{
- log.debug("ignoring notifyNodeDown=" + nodeID + " as isHA=false");
+ log.debug(this + "::ignoring notifyNodeDown=" + nodeID + " as isHA=false");
}
return;
}
if (log.isDebugEnabled())
{
- log.debug("XXX " + this + "::Notify nodeID=" + nodeID + " as being down");
+ log.debug("XXX YYY " + this + "::Notify nodeID=" + nodeID + " as being down");
}
removed = topology.removeMember(nodeID);
@@ -1218,6 +1225,11 @@
return;
}
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX YYY " + this + "::notifyNodeUp " + nodeID + ", connctorPair=" + connectorPair);
+ }
+
topology.addMember(nodeID, new TopologyMember(connectorPair));
TopologyMember actMember = topology.getMember(nodeID);
@@ -1256,10 +1268,20 @@
@Override
public String toString()
{
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
+ if (clusterConnection)
+ {
+ return "ServerLocatorImpl (clusterConnection identity=" + identity + ") [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
+ else
+ {
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
}
private void updateArraysAndPairs()
@@ -1317,7 +1339,7 @@
topologyListeners.add(listener);
if (topology.members() > 0)
{
- log.debug("ServerLocatorImpl.addClusterTopologyListener");
+ log.debug(this + "::ServerLocatorImpl.addClusterTopologyListener");
}
}
@@ -1387,7 +1409,7 @@
{
if (log.isDebugEnabled())
{
- log.debug("Submitting connect towards " + conn);
+ log.debug(this + "::Submitting connect towards " + conn);
}
csf = conn.tryConnect();
@@ -1522,7 +1544,7 @@
{
if (log.isDebugEnabled())
{
- log.debug("Trying to connect to " + factory);
+ log.debug(this + "::Trying to connect to " + factory);
}
try
{
@@ -1531,7 +1553,7 @@
}
catch (HornetQException e)
{
- log.debug("Exception on establish connector initial connection", e);
+ log.debug(this + "::Exception on establish connector initial connection", e);
return null;
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-07-27 01:26:35 UTC (rev 11047)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-07-27 01:57:20 UTC (rev 11048)
@@ -32,6 +32,9 @@
void start(Executor executor) throws Exception;
void factoryClosed(final ClientSessionFactory factory);
+
+ /** Used to better identify Cluster Connection Locators on logs while debugging logs */
+ void setIdentity(String identity);
void setNodeID(String nodeID);
12 years, 9 months
JBoss hornetq SVN: r11047 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-26 21:26:35 -0400 (Tue, 26 Jul 2011)
New Revision: 11047
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fix
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 01:26:35 UTC (rev 11047)
@@ -412,6 +412,7 @@
if (serverLocator != null)
{
serverLocator.setNodeID(nodeUUID.toString());
+ serverLocator.setIdentity(server.toString());
serverLocator.setReconnectAttempts(0);
@@ -502,8 +503,7 @@
}
- // TODO: does it need to be sync?
- public void nodeUP(final String nodeID,
+ public synchronized void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -645,6 +645,7 @@
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
+ targetLocator.setIdentity("Cluster-connection-bridge on ClusterConnectionImpl=" + this.toString());
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
12 years, 9 months