JBoss hornetq SVN: r10946 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-07 06:48:36 -0400 (Thu, 07 Jul 2011)
New Revision: 10946
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Remove duplicated code.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07 10:22:56 UTC (rev 10945)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07 10:48:36 UTC (rev 10946)
@@ -2876,19 +2876,16 @@
}
/**
- *
- * @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
- * */
+ * @param completeTransaction If the appendRecord is for a prepare or commit, where we should
+ * update the number of pendingTransactions on the current file
+ */
private JournalFile appendRecord(final JournalInternalRecord encoder,
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
final IOAsyncTask parameterCallback) throws Exception
{
- if (state != JournalState.LOADED)
- {
- throw new IllegalStateException("The journal is not loaded " + state);
- }
+ checkJournalIsLoaded();
final IOAsyncTask callback;
12 years, 11 months
JBoss hornetq SVN: r10945 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-07 06:22:56 -0400 (Thu, 07 Jul 2011)
New Revision: 10945
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Start the replication end point before contacting the 'live'
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-07 10:22:25 UTC (rev 10944)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-07 10:22:56 UTC (rev 10945)
@@ -553,6 +553,7 @@
Channel liveChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1).setHandler(replicationEndpoint);
+ replicationEndpoint.start();
liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
@@ -566,6 +567,7 @@
// XXX this really belongs to this point?
initialisePart2();
+
configuration.setBackup(false);
}
12 years, 11 months
JBoss hornetq SVN: r10944 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-07 06:22:25 -0400 (Thu, 07 Jul 2011)
New Revision: 10944
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Use enum for state instead of numeric codes, reduce code duplication.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07 10:21:52 UTC (rev 10943)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07 10:22:25 UTC (rev 10944)
@@ -65,26 +65,25 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* <p>A circular log implementation.</p
- *
+ *
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
public class JournalImpl implements TestableJournal, JournalRecordProvider
{
+ private enum JournalState
+ {
+ STOPPED, STARTED, LOADED;
+ }
// Constants -----------------------------------------------------
- private static final int STATE_STOPPED = 0;
- private static final int STATE_STARTED = 1;
-
- private static final int STATE_LOADED = 2;
-
public static final int FORMAT_VERSION = 2;
private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
@@ -210,7 +209,7 @@
private volatile JournalFile currentFile;
- private volatile int state;
+ private volatile JournalState state = JournalState.STOPPED;
private final Reclaimer reclaimer = new Reclaimer();
@@ -280,7 +279,7 @@
}
else
{
- this.compactPercentage = (float)compactPercentage / 100f;
+ this.compactPercentage = compactPercentage / 100f;
}
this.compactMinFiles = compactMinFiles;
@@ -378,7 +377,7 @@
return compactor;
}
- /** this method is used internally only however tools may use it to maintenance.
+ /** this method is used internally only however tools may use it to maintenance.
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
{
@@ -424,7 +423,7 @@
{
final int filesize = (int)file.getFile().size();
- wholeFileBuffer = fileFactory.newBuffer((int)filesize);
+ wholeFileBuffer = fileFactory.newBuffer(filesize);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
@@ -816,10 +815,7 @@
final boolean sync,
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -891,10 +887,7 @@
final boolean sync,
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -967,10 +960,7 @@
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
try
@@ -1046,10 +1036,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1088,6 +1075,14 @@
}
}
+ private void checkJournalIsLoaded()
+ {
+ if (state != JournalState.LOADED)
+ {
+ throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
+ }
+ }
+
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -1101,10 +1096,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1150,10 +1142,7 @@
public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1223,15 +1212,15 @@
}
}
- /**
- *
- * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
* back to a state it could be committed. </p>
- *
+ *
* <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
- *
+ *
* <p> This method also uses the same logic applied on {@link JournalImpl#appendCommitRecord(long, boolean)}
- *
+ *
* @param txID
* @param transactionData extra user data for the prepare
* @throws Exception
@@ -1242,10 +1231,7 @@
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1310,30 +1296,27 @@
appendCommitRecord(txID, sync, callback, true);
}
-
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
+ * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
* (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
* <p>The element-summary will then have</p>
* <p>FileID1, 10</p>
* <p>FileID2, 10</p>
* <p>FileID3, 10</p>
- *
+ *
* <br>
* <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
* <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
* That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
*
*/
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1392,10 +1375,7 @@
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1566,16 +1546,16 @@
return info;
}
-
-
+
+
public void testCompact() throws Exception
{
final AtomicInteger errors = new AtomicInteger(0);
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
compactorRunning.set(true);
-
+
// We can't use the executor for the compacting... or we would dead lock because of file open and creation
// operations (that will use the executor)
compactorExecutor.execute(new Runnable()
@@ -1599,14 +1579,14 @@
}
}
});
-
+
try
{
if (!latch.await(60, TimeUnit.SECONDS))
{
throw new RuntimeException("Didn't finish compact timely");
}
-
+
if (errors.get() > 0)
{
throw new RuntimeException("Error during testCompact, look at the logs");
@@ -1619,12 +1599,12 @@
}
/**
- *
+ *
* Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
- *
+ *
* Note: only synchronized methods on journal are methods responsible for the life-cycle such as stop, start
* records will still come as this is being executed
- *
+ *
*/
protected synchronized void compact() throws Exception
{
@@ -1657,7 +1637,7 @@
compactingLock.writeLock().lock();
try
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -1837,9 +1817,9 @@
}
- /**
+ /**
* <p>Load data accordingly to the record layouts</p>
- *
+ *
* <p>Basic record layout:</p>
* <table border=1>
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
@@ -1853,9 +1833,9 @@
* <tr><td>RecordBody</td><td>Byte Array (size=BodySize)</td></tr>
* <tr><td>Check Size</td><td>Integer (4 bytes)</td></tr>
* </table>
- *
+ *
* <p> The check-size is used to validate if the record is valid and complete </p>
- *
+ *
* <p>Commit/Prepare record layout:</p>
* <table border=1>
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
@@ -1870,18 +1850,18 @@
* <tr><td>* NumberOfElements(n)</td><td>Integer (4 bytes)</td></tr>
* <tr><td>CheckSize</td><td>Integer (4 bytes)</td</tr>
* </table>
- *
- * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
- *
+ *
+ * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
+ *
* */
public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
{
return load(loadManager, true);
}
-
+
public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
{
- if (state != JournalImpl.STATE_STARTED)
+ if (state != JournalState.STARTED)
{
throw new IllegalStateException("Journal must be in started state");
}
@@ -2197,7 +2177,7 @@
filesRepository.pushOpenedFile();
- state = JournalImpl.STATE_LOADED;
+ state = JournalState.LOADED;
for (TransactionHolder transaction : loadTransactions.values())
{
@@ -2241,7 +2221,7 @@
return new JournalLoadInformation(records.size(), maxID.longValue());
}
- /**
+ /**
* @return true if cleanup was called
*/
public boolean checkReclaimStatus() throws Exception
@@ -2311,7 +2291,7 @@
return;
}
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -2528,12 +2508,12 @@
public synchronized boolean isStarted()
{
- return state != JournalImpl.STATE_STOPPED;
+ return state != JournalState.STOPPED;
}
public synchronized void start()
{
- if (state != JournalImpl.STATE_STOPPED)
+ if (state != JournalState.STOPPED)
{
throw new IllegalStateException("Journal is not stopped");
}
@@ -2560,14 +2540,14 @@
fileFactory.start();
- state = JournalImpl.STATE_STARTED;
+ state = JournalState.STARTED;
}
public synchronized void stop() throws Exception
{
JournalImpl.trace("Stopping the journal");
- if (state == JournalImpl.STATE_STOPPED)
+ if (state == JournalState.STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
}
@@ -2577,7 +2557,7 @@
try
{
- state = JournalImpl.STATE_STOPPED;
+ state = JournalState.STOPPED;
compactorExecutor.shutdown();
@@ -2722,9 +2702,9 @@
/**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
- *
- * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about how the transaction-summary is recorded</p>
- *
+ *
+ * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about how the transaction-summary is recorded</p>
+ *
* @param journalTransaction
* @param orderedFiles
* @param recordedSummary
@@ -2895,8 +2875,8 @@
buffer.writeLong(fileID);
}
- /**
- *
+ /**
+ *
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
* */
private JournalFile appendRecord(final JournalInternalRecord encoder,
@@ -2905,7 +2885,7 @@
final JournalTransaction tx,
final IOAsyncTask parameterCallback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
throw new IllegalStateException("The journal is not loaded " + state);
}
@@ -3006,7 +2986,7 @@
private void scheduleReclaim()
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -3239,6 +3219,7 @@
this.pages = pages;
}
+ @Override
public void run()
{
try
12 years, 11 months
JBoss hornetq SVN: r10943 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-07 06:21:52 -0400 (Thu, 07 Jul 2011)
New Revision: 10943
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
Clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-07 05:33:22 UTC (rev 10942)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-07 10:21:52 UTC (rev 10943)
@@ -66,13 +66,8 @@
// Attributes ----------------------------------------------------
- private static final boolean trace = ReplicationEndpointImpl.log.isTraceEnabled();
+ private static final boolean trace = log.isTraceEnabled();
- private static void trace(final String msg)
- {
- ReplicationEndpointImpl.log.trace(msg);
- }
-
private final HornetQServer server;
private Channel channel;
@@ -181,7 +176,7 @@
}
else
{
- ReplicationEndpointImpl.log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
+ log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
}
}
catch (HornetQException e)
@@ -259,7 +254,7 @@
}
catch (Exception e)
{
- ReplicationEndpointImpl.log.warn("Error while closing the page on backup", e);
+ log.warn("Error while closing the page on backup", e);
}
}
}
@@ -304,7 +299,7 @@
{
if (!journalInformation[i].equals(journalLoadInformation[i]))
{
- ReplicationEndpointImpl.log.warn("Journal comparisson mismatch:\n" + journalParametersToString(journalInformation));
+ log.warn("Journal comparisson mismatch:\n" + journalParametersToString(journalInformation));
throw new HornetQException(HornetQException.ILLEGAL_STATE,
"Backup node can't connect to the live node as the data differs");
}
@@ -363,7 +358,7 @@
}
catch (Exception e)
{
- ReplicationEndpointImpl.log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+ log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
}
}
}
@@ -403,7 +398,7 @@
if (message == null)
{
- ReplicationEndpointImpl.log.warn("Large MessageID " + messageId +
+ log.warn("Large MessageID " + messageId +
" is not available on backup server. Ignoring replication message");
}
@@ -419,7 +414,7 @@
LargeServerMessage largeMessage = storage.createLargeMessage();
largeMessage.setDurable(true);
largeMessage.setMessageID(packet.getMessageId());
- ReplicationEndpointImpl.trace("Receiving Large Message " + largeMessage.getMessageID() + " on backup");
+ log.trace("Receiving Large Message " + largeMessage.getMessageID() + " on backup");
largeMessages.put(largeMessage.getMessageID(), largeMessage);
}
@@ -505,7 +500,7 @@
{
if (ReplicationEndpointImpl.trace)
{
- ReplicationEndpointImpl.trace("Endpoint appendUpdate id = " + packet.getId());
+ log.trace("Endpoint appendUpdate id = " + packet.getId());
}
journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
}
@@ -513,7 +508,7 @@
{
if (ReplicationEndpointImpl.trace)
{
- ReplicationEndpointImpl.trace("Endpoint append id = " + packet.getId());
+ log.trace("Endpoint append id = " + packet.getId());
}
journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
}
@@ -620,6 +615,6 @@
*/
private Journal getJournal(final byte journalID)
{
- return this.journals[journalID];
+ return journals[journalID];
}
}
12 years, 11 months
JBoss hornetq SVN: r10942 - in branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-07 01:33:22 -0400 (Thu, 07 Jul 2011)
New Revision: 10942
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
Log:
Fixing tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-07 00:15:00 UTC (rev 10941)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-07 05:33:22 UTC (rev 10942)
@@ -86,7 +86,7 @@
InVMConnection.flushEnabled = false;
try
{
- crash(sessionList.get(0));
+ crash(false, sessionList.get(0));
}
finally
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-07 00:15:00 UTC (rev 10941)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-07 05:33:22 UTC (rev 10942)
@@ -400,6 +400,11 @@
liveServer.crash(sessions);
}
+ protected void crash(final boolean waitFailure, final ClientSession... sessions) throws Exception
+ {
+ liveServer.crash(waitFailure, sessions);
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-07-07 00:15:00 UTC (rev 10941)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-07-07 05:33:22 UTC (rev 10942)
@@ -116,6 +116,11 @@
public void crash(ClientSession... sessions) throws Exception
{
+ crash(true, sessions);
+ }
+
+ public void crash(final boolean waitFailure, ClientSession... sessions) throws Exception
+ {
final CountDownLatch latch = new CountDownLatch(sessions.length);
class MyListener implements SessionFailureListener
@@ -140,10 +145,13 @@
serverProcess = null;
}
- // Wait to be informed of failure
- boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
+ if (waitFailure)
+ {
+ // Wait to be informed of failure
+ boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+ }
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-07-07 00:15:00 UTC (rev 10941)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-07-07 05:33:22 UTC (rev 10942)
@@ -81,6 +81,11 @@
public void crash(ClientSession... sessions) throws Exception
{
+ crash(true, sessions);
+ }
+
+ public void crash(boolean waitFailure, ClientSession... sessions) throws Exception
+ {
final CountDownLatch latch = new CountDownLatch(sessions.length);
class MyListener implements SessionFailureListener
@@ -105,6 +110,12 @@
clusterManager.clear();
server.stop(true);
+ if (waitFailure)
+ {
+ // Wait to be informed of failure
+ boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(ok);
+ }
}
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-07 00:15:00 UTC (rev 10941)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-07 05:33:22 UTC (rev 10942)
@@ -34,6 +34,8 @@
public void stop() throws Exception;
public void crash(ClientSession... sessions) throws Exception;
+
+ public void crash(boolean waitFailure, ClientSession... sessions) throws Exception;
public boolean isInitialised();
12 years, 11 months
JBoss hornetq SVN: r10941 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/remoting/impl/invm and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 20:15:00 -0400 (Wed, 06 Jul 2011)
New Revision: 10941
Added:
branches/Branch_2_2_EAP_cluster_clean2/tmphsperfdata_clebertsuconic/
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
Fixing tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -37,6 +37,8 @@
{
private static final Logger log = Logger.getLogger(InVMConnection.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final BufferHandler handler;
@@ -45,6 +47,9 @@
private final String id;
private boolean closed;
+
+ // Used on tests
+ public static boolean flushEnabled = true;
private final int serverID;
@@ -135,7 +140,10 @@
if (!closed)
{
copied.readInt(); // read and discard
-
+ if (isTrace)
+ {
+ log.trace(InVMConnection.this + "::Sending inVM packet");
+ }
handler.bufferReceived(id, copied);
}
}
@@ -145,10 +153,17 @@
InVMConnection.log.error(msg, e);
throw new IllegalStateException(msg, e);
}
+ finally
+ {
+ if (isTrace)
+ {
+ log.trace(InVMConnection.this + "::packet sent done");
+ }
+ }
}
});
- if (flush)
+ if (flush && flushEnabled)
{
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable(){
@@ -160,7 +175,10 @@
try
{
- latch.await(10, TimeUnit.SECONDS);
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out flushing channel on InVMConnection");
+ }
}
catch (InterruptedException e)
{
@@ -193,6 +211,11 @@
{
}
+ public void disableFlush()
+ {
+ flushEnabled = false;
+ }
+
public Executor getExecutor()
{
return executor;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -267,18 +267,33 @@
}
failureCheckAndFlushThread.close();
+
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Pausing acceptor " + acceptor);
+ }
acceptor.pause();
}
+ if (log.isDebugEnabled())
+ {
+ log.debug("Sending disconnect on live connections");
+ }
+
// Now we ensure that no connections will process any more packets after this method is complete
// then send a disconnect packet
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending connection.disconnection packet to " + conn);
+ }
conn.disconnect();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -1043,7 +1043,7 @@
server1.start();
server0.start();
- final int numMessages = 1000;
+ final int numMessages = 300;
final int totalrepeats = 3;
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -99,20 +99,6 @@
verifyNotReceive(0);
}
- public void testLoop() throws Exception
- {
- for (int i = 0 ; i < 10; i++)
- {
- log.info("#test " + i);
- testStartSourceServerBeforeTargetServer();
- if (i + 1 < 100000)
- {
- tearDown();
- setUp();
- }
- }
-
- }
public void testStartSourceServerBeforeTargetServer() throws Exception
{
startServers(0, 1);
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -25,9 +25,12 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.invm.InVMConnection;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
/**
* A FailoverOnFlowControlTest
@@ -39,6 +42,8 @@
public class FailoverOnFlowControlTest extends FailoverTestBase
{
+
+ private static Logger log = Logger.getLogger(FailoverOnFlowControlTest.class);
// Constants -----------------------------------------------------
@@ -58,24 +63,35 @@
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
locator.setProducerWindowSize(1000);
+ locator.setRetryInterval(123);
final ArrayList<ClientSession> sessionList = new ArrayList<ClientSession>();
Interceptor interceptorClient = new Interceptor()
{
AtomicInteger count = new AtomicInteger(0);
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
- System.out.println("Intercept..." + packet.getClass().getName());
+ log.debug("Intercept..." + packet.getClass().getName());
if (packet instanceof SessionProducerCreditsMessage )
{
SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
- System.out.println("Credits: " + credit.getCredits());
+ log.debug("Credits: " + credit.getCredits());
if (count.incrementAndGet() == 2)
{
try
{
- crash(sessionList.get(0));
+ log.debug("### crashing server");
+
+ InVMConnection.flushEnabled = false;
+ try
+ {
+ crash(sessionList.get(0));
+ }
+ finally
+ {
+ InVMConnection.flushEnabled = true;
+ }
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-07-06 20:52:54 UTC (rev 10940)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-07-07 00:15:00 UTC (rev 10941)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -34,6 +35,7 @@
*/
public class SameProcessHornetQServer implements TestableServer
{
+ private static Logger log = Logger.getLogger(SameProcessHornetQServer.class);
private HornetQServer server;
@@ -85,12 +87,13 @@
{
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ log.debug("MyListener.connectionFailed failedOver=" + failedOver, me);
latch.countDown();
}
public void beforeReconnect(HornetQException exception)
{
- System.out.println("MyListener.beforeReconnect");
+ log.debug("MyListener.beforeReconnect", exception);
}
}
for (ClientSession session : sessions)
@@ -102,11 +105,6 @@
clusterManager.clear();
server.stop(true);
-
- // Wait to be informed of failure
- boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
}
/* (non-Javadoc)
12 years, 11 months
JBoss hornetq SVN: r10940 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 16:52:54 -0400 (Wed, 06 Jul 2011)
New Revision: 10940
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
fixing more tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-06 20:07:47 UTC (rev 10939)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-06 20:52:54 UTC (rev 10940)
@@ -139,40 +139,6 @@
}
}
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
-
- log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
-
- try
- {
- // csf.cleanup();
- }
- catch (Throwable dontCare)
- {
- }
-
- try
- {
- // session.cleanUp(false);
- }
- catch (Throwable dontCare)
- {
- }
-
- if (me.getCode() == HornetQException.DISCONNECTED)
- {
- fail(true);
- }
- else
- {
- fail(false);
- scheduleRetryConnect();
- }
- }
-
-
-
@Override
protected ServerMessage beforeForward(ServerMessage message)
{
12 years, 11 months
JBoss hornetq SVN: r10939 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 16:07:47 -0400 (Wed, 06 Jul 2011)
New Revision: 10939
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
trying to fix more tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-06 19:32:56 UTC (rev 10938)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-06 20:07:47 UTC (rev 10939)
@@ -516,7 +516,7 @@
// FailureListener implementation --------------------------------
- public final void connectionFailed(final HornetQException me, boolean failedOver)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
@@ -544,8 +544,9 @@
else
{
fail(false);
- scheduleRetryConnect();
}
+
+ scheduleRetryConnect();
}
public void beforeReconnect(final HornetQException exception)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-06 19:32:56 UTC (rev 10938)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-06 20:07:47 UTC (rev 10939)
@@ -138,7 +138,41 @@
log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
}
}
+
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+
+ try
+ {
+ // csf.cleanup();
+ }
+ catch (Throwable dontCare)
+ {
+ }
+ try
+ {
+ // session.cleanUp(false);
+ }
+ catch (Throwable dontCare)
+ {
+ }
+
+ if (me.getCode() == HornetQException.DISCONNECTED)
+ {
+ fail(true);
+ }
+ else
+ {
+ fail(false);
+ scheduleRetryConnect();
+ }
+ }
+
+
+
@Override
protected ServerMessage beforeForward(ServerMessage message)
{
12 years, 11 months
JBoss hornetq SVN: r10938 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 15:32:56 -0400 (Wed, 06 Jul 2011)
New Revision: 10938
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 19:31:26 UTC (rev 10937)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 19:32:56 UTC (rev 10938)
@@ -1379,7 +1379,10 @@
{
final DisconnectMessage msg = (DisconnectMessage)packet;
- log.info("PUTZ10 Disconnect arrived: " + msg);
+ if (log.isTraceEnabled())
+ {
+ log.trace("Disconnect being called on client:" + msg);
+ }
closeExecutor.execute(new Runnable()
{
@@ -1393,7 +1396,6 @@
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
- log.info("Disconnect being called on connection");
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
12 years, 11 months
JBoss hornetq SVN: r10937 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 15:31:26 -0400 (Wed, 06 Jul 2011)
New Revision: 10937
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
removing logs
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-06 19:19:41 UTC (rev 10936)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-06 19:31:26 UTC (rev 10937)
@@ -1890,7 +1890,6 @@
if (removed != null)
{
- log.info("PUTZ Adding referencesToACK: " + removed);
referencesToAck.add(removed);
}
@@ -1898,8 +1897,6 @@
for (MessageReference ack : referencesToAck)
{
- log.info("PUTZ Ack = " + ack);
- log.info("PUTZ ACK.getQueue() = " + ack.getQueue());
ack.getQueue().reacknowledge(tx, ack);
}
12 years, 11 months