JBoss hornetq SVN: r11096 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-02 10:37:41 -0400 (Tue, 02 Aug 2011)
New Revision: 11096
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-11041
+ /trunk:10878-11095
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-08-02 14:18:22 UTC (rev 11095)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-08-02 14:37:41 UTC (rev 11096)
@@ -204,9 +204,14 @@
public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
int bytesToRead = bytes.limit();
-
long positionToRead = position.getAndAdd(bytesToRead);
+ long size = size();
+ if (size < (positionToRead + bytesToRead))
+ {
+ bytesToRead = (int)(size - positionToRead);
+ }
+
bytes.rewind();
aioFile.read(positionToRead, bytesToRead, bytes, callback);
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 14:18:22 UTC (rev 11095)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 14:37:41 UTC (rev 11096)
@@ -1902,17 +1902,9 @@
int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- private void checkID(final long id)
- {
- if (id > maxID.longValue())
- {
- maxID.set(id);
- }
- }
-
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -1923,7 +1915,7 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -1964,7 +1956,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -2209,10 +2201,7 @@
{
for (RecordInfo info : transaction.recordInfos)
{
- if (info.id > maxID.get())
- {
- maxID.set(info.id);
- }
+ setAtomicLong(info.id, maxID);
}
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
@@ -2674,6 +2663,23 @@
}
+ private static final void setAtomicLong(final long target, AtomicLong atomic)
+ {
+ while (true)
+ {
+ long value = atomic.get();
+ if (target > value)
+ {
+ if (atomic.compareAndSet(value, target))
+ return;
+ }
+ else
+ {
+ return;
+ }
+ }
+ }
+
/**
* @param name
* @return
12 years, 10 months
JBoss hornetq SVN: r11095 - trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-02 10:18:22 -0400 (Tue, 02 Aug 2011)
New Revision: 11095
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Fix race condition when manipulating AtomicLong.
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 10:18:08 UTC (rev 11094)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 14:18:22 UTC (rev 11095)
@@ -1913,17 +1913,9 @@
int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- private void checkID(final long id)
- {
- if (id > maxID.longValue())
- {
- maxID.set(id);
- }
- }
-
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -1934,7 +1926,7 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -1975,7 +1967,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- checkID(info.id);
+ setAtomicLong(info.id, maxID);
hasData.set(true);
@@ -2220,10 +2212,7 @@
{
for (RecordInfo info : transaction.recordInfos)
{
- if (info.id > maxID.get())
- {
- maxID.set(info.id);
- }
+ setAtomicLong(info.id, maxID);
}
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
@@ -2690,6 +2679,23 @@
}
+ private static final void setAtomicLong(final long target, AtomicLong atomic)
+ {
+ while (true)
+ {
+ long value = atomic.get();
+ if (target > value)
+ {
+ if (atomic.compareAndSet(value, target))
+ return;
+ }
+ else
+ {
+ return;
+ }
+ }
+ }
+
/**
* @param name
* @return
12 years, 10 months
JBoss hornetq SVN: r11094 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-02 06:18:08 -0400 (Tue, 02 Aug 2011)
New Revision: 11094
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 FIX sending of replicated packets & document mechanism.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-02 10:18:08 UTC (rev 11094)
@@ -6,7 +6,11 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Send all fileIDs used in the live server to the backup.
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
*/
public class ReplicationFileIdMessage extends PacketImpl
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-02 10:18:08 UTC (rev 11094)
@@ -410,7 +410,6 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
}
-
final Journal journalIf = journals[packet.getJournalContentType().typeByte];
JournalImpl journal = assertJournalImpl(journalIf);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-02 10:18:08 UTC (rev 11094)
@@ -69,10 +69,6 @@
// Attributes ----------------------------------------------------
private final ResponseHandler responseHandler = new ResponseHandler();
-
-// private final ClientSessionFactoryInternal sessionFactory;
-// private CoreRemotingConnection replicatingConnection;
-
private final Channel replicatingChannel;
private boolean started;
@@ -424,6 +420,11 @@
}
}
+ /**
+ * @throws IllegalStateException By default, all replicated packets generate a replicated
+ * response. If your packets are triggering this exception, it may be because the
+ * packets were not sent with {@link #sendReplicatePacket(Packet)}.
+ */
private void replicated()
{
OperationContext ctx = pendingTokens.poll();
@@ -523,7 +524,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
+ sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
if (bytesRead == -1 || bytesRead == 0)
break;
}
@@ -532,13 +533,13 @@
@Override
public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
{
- replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
}
@Override
public void sendSynchronizationDone()
{
- replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+ sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
}
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 10:18:08 UTC (rev 11094)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -3276,6 +3277,7 @@
writeLock();
try
{
+ log.info("Reserving fileIDs before synchronization: " + Arrays.toString(fileIds));
long maxID = -1;
for (long id : fileIds)
{
@@ -3321,7 +3323,7 @@
{
filesRepository.addDataFileOnTop(file);
}
- // XXX HORNETQ-720 still missing a "reload" call
+ // XXX HORNETQ-720 still missing a "reload" call?
}
finally
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-02 10:18:08 UTC (rev 11094)
@@ -58,6 +58,10 @@
}
backupServer.start();
waitForBackup(sessionFactory, 10);
+
+ // SEND more messages, now with the backup replicating
+ sendMessages(session, producer, N_MSGS);
+
Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized", backupServer.getServer().isInitialised());
crash(session);
@@ -65,7 +69,7 @@
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("sets must match! " + liveIds, liveIds, backupIds);
+ assertEquals("File IDs must match!", liveIds, backupIds);
}
private static void waitForServerInitialization(HornetQServer server, int seconds)
12 years, 10 months
JBoss hornetq SVN: r11093 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/journal/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 23:27:37 -0400 (Mon, 01 Aug 2011)
New Revision: 11093
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
Log:
tweaks for tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -516,6 +516,11 @@
{
checkClosed();
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending commit");
+ }
+
if (rollbackOnly)
{
rollbackOnFailover();
@@ -928,6 +933,10 @@
if (response.isReattached())
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ClientSession reattached fine, replaying commands");
+ }
// The session was found on the server - we reattached transparently ok
channel.replayCommands(response.getLastConfirmedCommandID());
@@ -935,6 +944,11 @@
else
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ClientSession couldn't be reattached, creating a new session");
+ }
+
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session won't exist or the target server has been restarted - in this case the session will need to be
// recreated,
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -97,7 +97,7 @@
// This is useful at debug time...
// if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
- private static final boolean TRACE_RECORDS = false;
+ private static final boolean TRACE_RECORDS = trace;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -2560,6 +2560,13 @@
@Override
public String toString()
{
+ // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
+ // and this may be useful to validate the journal on those tests
+ // You may uncomment these two lines on that case and replcate the toString for the PrintData
+
+ // SimpleString simpleStr = new SimpleString(duplID);
+ // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
+
return "DuplicateIDEncoding [address=" + address + ", duplID=" + Arrays.toString(duplID) + "]";
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -1131,14 +1131,12 @@
if (rejectDuplicates && isDuplicate)
{
- StringBuffer warnMessage = new StringBuffer();
- warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
- warnMessage.append(message.toString());
- PostOfficeImpl.log.warn(warnMessage.toString());
+ String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
+ PostOfficeImpl.log.warn(warnMessage);
if (context.getTransaction() != null)
{
- context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
+ context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage));
}
return false;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -424,6 +424,10 @@
public void markAsRollbackOnly(final HornetQException exception)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Marking Transaction " + this.id + " as rollback only");
+ }
state = State.ROLLBACK_ONLY;
this.exception = exception;
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-01 21:08:32 UTC (rev 11092)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-02 03:27:37 UTC (rev 11093)
@@ -181,6 +181,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(10 * 1024 * 1024);
sf = (ClientSessionFactoryInternal) createSessionFactoryAndWaitForTopology(locator, 2);
@@ -209,6 +210,10 @@
// Simulate failure on connection
synchronized (lockFail)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("#test crashing test");
+ }
crash((ClientSession) createSession);
}
@@ -381,14 +386,14 @@
{
// For duplication detection
int executionId = 0;
-
- log.info("#test doTestTransactional starting now");
-
+
while (!runner.isFailed())
{
ClientSession session = null;
executionId++;
+
+ log.info("#test doTestTransactional starting now. Execution " + executionId);
try
{
@@ -441,13 +446,13 @@
{
if (e.getCode() == HornetQException.DUPLICATE_ID_REJECTED)
{
- logAndSystemOut("#test duplicate id rejected");
+ logAndSystemOut("#test duplicate id rejected on sending");
break;
}
else
if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK || e.getCode() == HornetQException.UNBLOCKED)
{
- log.info("#test transaction rollback retrying");
+ log.info("#test transaction rollback retrying on sending");
// OK
retry = true;
}
@@ -460,7 +465,7 @@
}
while (retry);
- logAndSystemOut("Finished sending, starting consumption now");
+ logAndSystemOut("#test Finished sending, starting consumption now");
boolean blocked = false;
@@ -482,6 +487,10 @@
for (int i = 0; i < numMessages; i++)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Consumer receiving message " + i);
+ }
ClientMessage message = consumer.receive(10000);
if (message == null)
{
@@ -494,12 +503,18 @@
}
int count = message.getIntProperty("counter");
+
+ if (count != i)
+ {
+ log.warn("count was received out of order, " + count + "!=" + i);
+ }
msgs.add(count);
message.acknowledge();
}
+ log.info("#test commit");
session.commit();
try
@@ -515,6 +530,7 @@
}
catch (Throwable e)
{
+ log.info(threadDump("Thread dump, messagesReceived = " + msgs.size()));
logAndSystemOut(e.getMessage() + " messages received");
for (Integer msg : msgs)
{
12 years, 10 months
JBoss hornetq SVN: r11092 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 17:08:32 -0400 (Mon, 01 Aug 2011)
New Revision: 11092
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Log:
oops
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-08-01 19:39:43 UTC (rev 11091)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-08-01 21:08:32 UTC (rev 11092)
@@ -15,15 +15,12 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
-import junit.framework.Assert;
-
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
@@ -33,9 +30,19 @@
public class JMSPagingFileDeleteTest extends JMSTestBase
{
static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
-
+
Topic topic1;
+ Connection connection;
+
+ Session session;
+
+ MessageConsumer subscriber1;
+
+ MessageConsumer subscriber2;
+
+ PagingStore pagingStore;
+
private static final int MESSAGE_SIZE = 1024;
private static final int PAGE_SIZE = 10 * 1024;
@@ -73,195 +80,113 @@
topic1 = null;
super.tearDown();
}
-
+
public void testTopics() throws Exception
{
- Connection connection = null;
+ connection = null;
try
{
connection = cf.createConnection();
connection.setClientID("cid");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic1);
- MessageConsumer subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
- MessageConsumer subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
- int numMessages = sendMessages(createMessage(session), producer);
+ // -----------------(Step1) Publish Messages to make Paging Files. --------------------
+ System.out.println("---------- Send messages. ----------");
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ producer.send(bytesMessage);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
- printPageStoreInfo();
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ printPageStoreInfo(pagingStore);
- Assert.assertTrue(getPagingStore().isPaging());
+ assertTrue(pagingStore.isPaging());
connection.start();
// -----------------(Step2) Restart the server. --------------------------------------
- // If try this test without restarting server, please comment out this section;
- close(connection);
- stopAndStartServer();
+ stopAndStartServer(); // If try this test without restarting server, please comment out this line;
- connection = cf.createConnection();
- connection.setClientID("cid");
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
- subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
- connection.start();
-
// -----------------(Step3) Subscribe to all the messages from the topic.--------------
System.out.println("---------- Receive all messages. ----------");
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
{
- Assert.assertNotNull(subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
- Assert.assertNotNull(subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
+ Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message1);
+ Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message2);
}
- waitUntilPagingStops(5000);
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(pagingStore.isPaging());
+
+ printPageStoreInfo(pagingStore);
- printPageStoreInfo();
-
- Assert.assertEquals(0, getPagingStore().getAddressSize());
+ assertEquals(0, pagingStore.getAddressSize());
// assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
- Assert.assertFalse(getPagingStore().isPaging()); // I expected IsPaging is false, but It was true.
+ assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
// If the server is not restart, this test pass.
// -----------------(Step4) Publish a message. the message is stored in the paging file.
producer = session.createProducer(topic1);
- sendMessage(createMessage(session), producer);
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ producer.send(bytesMessage);
- printPageStoreInfo();
+ printPageStoreInfo(pagingStore);
- Assert.assertEquals(1, getPagingStore().getNumberOfPages()); // I expected number of the page is 1, but It was
- // not.
+ assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
}
finally
{
- close(connection);
- }
- }
-
- public void testTopics_nonDurable() throws Exception
- {
- Connection connection = null;
-
- try
- {
- connection = cf.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = session.createProducer(topic1);
-
- printPageStoreInfo();
-
- connection.start();
-
- MessageConsumer subscriber = session.createConsumer(topic1);
- final Message message = createMessage(session);
- int numMessages = sendJustEnoughMessagesForPaging(message, producer);
-
- // ###Works if uncomment this to send one extra message or if use sendMessages instead above
- // printPageStoreInfo();
- sendMessage(message, producer);
- // numMessages++;
-
- printPageStoreInfo();
-
- for (int i = 0; i < numMessages; i++)
- {
- Assert.assertNotNull(subscriber.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
- System.out.println("Messages recd:" + (i + 1));
- }
-
- assertNull(subscriber.receive(1000));
-
- waitUntilPagingStops(5000);
-
- printPageStoreInfo();
- }
- finally
- {
- close(connection);
- }
- }
-
- private void close(final Connection connection)
- {
- try
- {
if (connection != null)
{
connection.close();
}
}
- catch (JMSException e)
- {
- e.printStackTrace();
- }
}
- private int sendMessages(final Message message, final MessageProducer producer) throws JMSException
+ private void stopAndStartServer() throws Exception
{
- System.out.println("---------- Send messages. ----------");
- for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
- {
- sendMessage(message, producer);
- }
- System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+ System.out.println("---------- Restart server. ----------");
+ connection.close();
- return JMSPagingFileDeleteTest.MESSAGE_NUM;
- }
+ jmsServer.stop();
- private int sendJustEnoughMessagesForPaging(final Message message, final MessageProducer producer) throws Exception
- {
- int messagesSendCount = 0;
- while (!getPagingStore().isPaging())
- {
- sendMessage(message, producer);
- messagesSendCount++;
- }
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
- System.out.println(messagesSendCount + " messages sent before paging started");
-
- return messagesSendCount;
+ printPageStoreInfo(pagingStore);
+ reconnect();
}
- private void sendMessage(final Message message, final MessageProducer producer) throws JMSException
+ private void reconnect() throws Exception
{
- producer.send(message);
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
}
- private Message createMessage(final Session session) throws JMSException
+ private void printPageStoreInfo(PagingStore pagingStore) throws Exception
{
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
- return bytesMessage;
- }
-
- private void waitUntilPagingStops(final int timeoutMillis) throws Exception, InterruptedException
- {
- long timeout = System.currentTimeMillis() + timeoutMillis;
- while (timeout > System.currentTimeMillis() && getPagingStore().isPaging())
- {
- Thread.sleep(100);
- }
-
- if (!getPagingStore().isPaging())
- {
- System.exit(-1);
- }
- Assert.assertFalse("Paging should have stopped", getPagingStore().isPaging());
- }
-
- private PagingStore getPagingStore() throws Exception
- {
- return server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
- }
-
- private void printPageStoreInfo() throws Exception
- {
- PagingStore pagingStore = getPagingStore();
System.out.println("---------- Paging Store Info ----------");
System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
System.out.println(" FirstPage = " + pagingStore.getFirstPage());
@@ -269,17 +194,4 @@
System.out.println(" Address Size = " + pagingStore.getAddressSize());
System.out.println(" Is Paging = " + pagingStore.isPaging());
}
-
- private void stopAndStartServer() throws Exception
- {
- System.out.println("---------- Restart server. ----------");
-
- jmsServer.stop();
-
- jmsServer.start();
- jmsServer.activated();
- registerConnectionFactory();
-
- printPageStoreInfo();
- }
}
\ No newline at end of file
12 years, 10 months
JBoss hornetq SVN: r11091 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/client and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 15:39:43 -0400 (Mon, 01 Aug 2011)
New Revision: 11091
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
log and debug for a test
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -1087,13 +1087,6 @@
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
- if (log.isDebugEnabled())
- {
- log.debug("Trying to connect with connector = " + connectorFactory +
- ", parameters = " +
- connectorConfig.getParams());
- }
-
connector = connectorFactory.createConnector(connectorConfig.getParams(),
handler,
this,
@@ -1101,6 +1094,15 @@
threadPool,
scheduledThreadPool);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying to connect with connector = " + connectorFactory +
+ ", parameters = " +
+ connectorConfig.getParams() + " connector = " + connector);
+ }
+
+
+
if (connector != null)
{
connector.start();
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -15,12 +15,15 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
+import junit.framework.Assert;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
@@ -30,19 +33,9 @@
public class JMSPagingFileDeleteTest extends JMSTestBase
{
static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
-
+
Topic topic1;
- Connection connection;
-
- Session session;
-
- MessageConsumer subscriber1;
-
- MessageConsumer subscriber2;
-
- PagingStore pagingStore;
-
private static final int MESSAGE_SIZE = 1024;
private static final int PAGE_SIZE = 10 * 1024;
@@ -80,113 +73,195 @@
topic1 = null;
super.tearDown();
}
-
+
public void testTopics() throws Exception
{
- connection = null;
+ Connection connection = null;
try
{
connection = cf.createConnection();
connection.setClientID("cid");
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic1);
- subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
- subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ MessageConsumer subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ MessageConsumer subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
- // -----------------(Step1) Publish Messages to make Paging Files. --------------------
- System.out.println("---------- Send messages. ----------");
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
- for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
- {
- producer.send(bytesMessage);
- }
- System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+ int numMessages = sendMessages(createMessage(session), producer);
- pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
- printPageStoreInfo(pagingStore);
+ printPageStoreInfo();
- assertTrue(pagingStore.isPaging());
+ Assert.assertTrue(getPagingStore().isPaging());
connection.start();
// -----------------(Step2) Restart the server. --------------------------------------
- stopAndStartServer(); // If try this test without restarting server, please comment out this line;
+ // If try this test without restarting server, please comment out this section;
+ close(connection);
+ stopAndStartServer();
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
+
// -----------------(Step3) Subscribe to all the messages from the topic.--------------
System.out.println("---------- Receive all messages. ----------");
- for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ for (int i = 0; i < numMessages; i++)
{
- Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
- assertNotNull(message1);
- Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
- assertNotNull(message2);
+ Assert.assertNotNull(subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
+ Assert.assertNotNull(subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
}
- pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
- long timeout = System.currentTimeMillis() + 5000;
- while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
- {
- Thread.sleep(100);
- }
- assertFalse(pagingStore.isPaging());
-
- printPageStoreInfo(pagingStore);
+ waitUntilPagingStops(5000);
- assertEquals(0, pagingStore.getAddressSize());
+ printPageStoreInfo();
+
+ Assert.assertEquals(0, getPagingStore().getAddressSize());
// assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
- assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
+ Assert.assertFalse(getPagingStore().isPaging()); // I expected IsPaging is false, but It was true.
// If the server is not restart, this test pass.
// -----------------(Step4) Publish a message. the message is stored in the paging file.
producer = session.createProducer(topic1);
- bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
- producer.send(bytesMessage);
+ sendMessage(createMessage(session), producer);
- printPageStoreInfo(pagingStore);
+ printPageStoreInfo();
- assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ Assert.assertEquals(1, getPagingStore().getNumberOfPages()); // I expected number of the page is 1, but It was
+ // not.
}
finally
{
+ close(connection);
+ }
+ }
+
+ public void testTopics_nonDurable() throws Exception
+ {
+ Connection connection = null;
+
+ try
+ {
+ connection = cf.createConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(topic1);
+
+ printPageStoreInfo();
+
+ connection.start();
+
+ MessageConsumer subscriber = session.createConsumer(topic1);
+ final Message message = createMessage(session);
+ int numMessages = sendJustEnoughMessagesForPaging(message, producer);
+
+ // ###Works if uncomment this to send one extra message or if use sendMessages instead above
+ // printPageStoreInfo();
+ sendMessage(message, producer);
+ // numMessages++;
+
+ printPageStoreInfo();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ Assert.assertNotNull(subscriber.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT));
+ System.out.println("Messages recd:" + (i + 1));
+ }
+
+ assertNull(subscriber.receive(1000));
+
+ waitUntilPagingStops(5000);
+
+ printPageStoreInfo();
+ }
+ finally
+ {
+ close(connection);
+ }
+ }
+
+ private void close(final Connection connection)
+ {
+ try
+ {
if (connection != null)
{
connection.close();
}
}
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
}
- private void stopAndStartServer() throws Exception
+ private int sendMessages(final Message message, final MessageProducer producer) throws JMSException
{
- System.out.println("---------- Restart server. ----------");
- connection.close();
+ System.out.println("---------- Send messages. ----------");
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ sendMessage(message, producer);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
- jmsServer.stop();
+ return JMSPagingFileDeleteTest.MESSAGE_NUM;
+ }
- jmsServer.start();
- jmsServer.activated();
- registerConnectionFactory();
+ private int sendJustEnoughMessagesForPaging(final Message message, final MessageProducer producer) throws Exception
+ {
+ int messagesSendCount = 0;
+ while (!getPagingStore().isPaging())
+ {
+ sendMessage(message, producer);
+ messagesSendCount++;
+ }
- printPageStoreInfo(pagingStore);
- reconnect();
+ System.out.println(messagesSendCount + " messages sent before paging started");
+
+ return messagesSendCount;
}
- private void reconnect() throws Exception
+ private void sendMessage(final Message message, final MessageProducer producer) throws JMSException
{
- connection = cf.createConnection();
- connection.setClientID("cid");
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
- subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
- connection.start();
+ producer.send(message);
}
- private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+ private Message createMessage(final Session session) throws JMSException
{
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ return bytesMessage;
+ }
+
+ private void waitUntilPagingStops(final int timeoutMillis) throws Exception, InterruptedException
+ {
+ long timeout = System.currentTimeMillis() + timeoutMillis;
+ while (timeout > System.currentTimeMillis() && getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ if (!getPagingStore().isPaging())
+ {
+ System.exit(-1);
+ }
+ Assert.assertFalse("Paging should have stopped", getPagingStore().isPaging());
+ }
+
+ private PagingStore getPagingStore() throws Exception
+ {
+ return server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ }
+
+ private void printPageStoreInfo() throws Exception
+ {
+ PagingStore pagingStore = getPagingStore();
System.out.println("---------- Paging Store Info ----------");
System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
System.out.println(" FirstPage = " + pagingStore.getFirstPage());
@@ -194,4 +269,17 @@
System.out.println(" Address Size = " + pagingStore.getAddressSize());
System.out.println(" Is Paging = " + pagingStore.isPaging());
}
+
+ private void stopAndStartServer() throws Exception
+ {
+ System.out.println("---------- Restart server. ----------");
+
+ jmsServer.stop();
+
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
+
+ printPageStoreInfo();
+ }
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.distribution;
+import org.hornetq.core.logging.Logger;
+
/**
* A NettyFileStorageSymmetricClusterWithDiscoveryTest
*
@@ -22,6 +24,8 @@
*/
public class NettyFileStorageSymmetricClusterWithDiscoveryTest extends SymmetricClusterWithDiscoveryTest
{
+ Logger log = Logger.getLogger(NettyFileStorageSymmetricClusterWithDiscoveryTest.class);
+
@Override
protected boolean isNetty()
{
@@ -32,6 +36,6 @@
{
return true;
}
+
-
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -22,6 +22,8 @@
package org.hornetq.tests.integration.cluster.distribution;
+import org.hornetq.core.logging.Logger;
+
/**
* A NettySymmetricClusterWithBackupTest
*
@@ -31,6 +33,8 @@
*/
public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
{
+ private Logger log = Logger.getLogger(NettySymmetricClusterWithBackupTest.class);
+
@Override
protected boolean isNetty()
{
@@ -39,10 +43,10 @@
public void _test() throws Exception
{
- for (int i = 0; i < 50; i++)
+ for (int i = 0; i < 500; i++)
{
- System.out.println("\n\n" + i + "\n\n");
- _testStartStopServers();
+ log.info("#test " + i);
+ testNoLocalQueueLoadBalancedQueues ();
tearDown();
setUp();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -402,4 +402,18 @@
assertEquals(records.size(), 1);
System.out.println("OneWayChainClusterTest.testChainClusterConnections");
}
+
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 500; i++)
+ {
+ log.info("#test " + i);
+ testRoundRobinForwardWhenNoConsumersTrue();
+ tearDown();
+ setUp();
+ }
+ }
+
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -81,6 +81,17 @@
}
});
}
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.info("#test " + i);
+ testTransactional();
+ tearDown();
+ setUp();
+ }
+ }
public void testTransactional() throws Throwable
{
@@ -366,10 +377,12 @@
}
}
- private void doTestTransactional(final TestRunner runner) throws Exception
+ private void doTestTransactional(final TestRunner runner) throws Throwable
{
// For duplication detection
int executionId = 0;
+
+ log.info("#test doTestTransactional starting now");
while (!runner.isFailed())
{
@@ -411,10 +424,15 @@
addPayload(message);
-
+ if (log.isDebugEnabled())
+ {
+ log.debug("Sending message " + message);
+ }
+
producer.send(message);
}
+ log.debug("Sending commit");
session.commit();
retry = false;
@@ -423,32 +441,37 @@
{
if (e.getCode() == HornetQException.DUPLICATE_ID_REJECTED)
{
+ logAndSystemOut("#test duplicate id rejected");
break;
}
else
if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK || e.getCode() == HornetQException.UNBLOCKED)
{
+ log.info("#test transaction rollback retrying");
// OK
retry = true;
}
else
{
+ log.info("#test Exception " + e, e);
throw e;
}
}
}
while (retry);
+ logAndSystemOut("Finished sending, starting consumption now");
boolean blocked = false;
retry = false;
+ ArrayList<Integer> msgs = new ArrayList<Integer>();
ClientConsumer consumer = null;
do
{
- ArrayList<Integer> msgs = new ArrayList<Integer>();
+ msgs.clear();
try
{
if (consumer == null)
@@ -459,11 +482,16 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receive(10000);
if (message == null)
{
break;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Received message " + message);
+ }
int count = message.getIntProperty("counter");
@@ -473,14 +501,26 @@
}
session.commit();
-
- if (blocked)
+
+ try
{
- assertTrue("msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
+ if (blocked)
+ {
+ assertTrue("msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
+ }
+ else
+ {
+ assertTrue("msgs.size is expected to be " + numMessages + " but it was " + msgs.size(), msgs.size() == numMessages);
+ }
}
- else
+ catch (Throwable e)
{
- assertTrue("msgs.size is expected to be " + numMessages + " but it was " + msgs.size(), msgs.size() == numMessages);
+ logAndSystemOut(e.getMessage() + " messages received");
+ for (Integer msg : msgs)
+ {
+ logAndSystemOut(msg.toString());
+ }
+ throw e;
}
int i = 0;
@@ -496,6 +536,7 @@
{
if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
{
+ logAndSystemOut("Transaction rolled back with " + msgs.size(), e);
// TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
// ATM RolledBack exception is being called with the transaction is committed.
// the test will fail if you remove this next line
@@ -503,6 +544,7 @@
}
else if (e.getCode() == HornetQException.UNBLOCKED)
{
+ logAndSystemOut("Unblocked with " + msgs.size(), e);
// TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
// This part of the test is never being called.
blocked = true;
@@ -514,6 +556,7 @@
}
else
{
+ logAndSystemOut(e.getMessage(), e);
throw e;
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -192,6 +192,7 @@
@Override
protected void tearDown() throws Exception
{
+ logAndSystemOut("#test tearDown");
backupServer.stop();
liveServer.stop();
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.logging.Logger;
/**
* A NettyAsynchronousReattachTest
@@ -40,12 +41,15 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+ private final Logger log = Logger.getLogger(NettyAsynchronousReattachTest.class);
+
protected void crash(final ClientSession... sessions) throws Exception
{
+ log.debug("Crashing sessions");
for (ClientSession session : sessions)
{
+ log.debug("Crashing session " + session);
ClientSessionInternal internalSession = (ClientSessionInternal) session;
internalSession.getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED, "oops"));
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-01 16:25:50 UTC (rev 11090)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-01 19:39:43 UTC (rev 11091)
@@ -363,7 +363,24 @@
return str.toString();
}
+
+ /** Sends the message to both logger and System.out (for unit report) */
+ public void logAndSystemOut(String message, Exception e)
+ {
+ Logger log = Logger.getLogger(this.getClass());
+ log.info(message, e);
+ System.out.println(message);
+ e.printStackTrace(System.out);
+ }
+ /** Sends the message to both logger and System.out (for unit report) */
+ public void logAndSystemOut(String message)
+ {
+ Logger log = Logger.getLogger(this.getClass());
+ log.info(message);
+ System.out.println(message);
+ }
+
protected static TestSuite createAIOTestSuite(final Class<?> clazz)
{
TestSuite suite = new TestSuite(clazz.getName() + " testsuite");
12 years, 10 months
JBoss hornetq SVN: r11090 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 12:25:50 -0400 (Mon, 01 Aug 2011)
New Revision: 11090
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Get live's files, crash live, wait for backup to take over.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-01 15:27:51 UTC (rev 11089)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-01 16:25:50 UTC (rev 11090)
@@ -13,6 +13,7 @@
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -57,18 +58,37 @@
}
backupServer.start();
waitForBackup(sessionFactory, 10);
- // XXX HORNETQ-720 must wait for backup to sync!
+ Set<Long> liveIds = getFileIds(messageJournal);
+ assertFalse("should not be initialized", backupServer.getServer().isInitialised());
+ crash(session);
+ waitForServerInitialization(backupServer.getServer(), 5);
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
Set<Long> backupIds = getFileIds(backupMsgJournal);
- Set<Long> liveIds = getFileIds(messageJournal);
assertEquals("sets must match! " + liveIds, liveIds, backupIds);
}
- /**
- * @param backupMsgJournal
- * @return
- */
+ private static void waitForServerInitialization(HornetQServer server, int seconds)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!server.isInitialised())
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("component did not start within timeout of " + seconds);
+ }
+ }
+ }
+
private Set<Long> getFileIds(JournalImpl journal)
{
Set<Long> results = new HashSet<Long>();
12 years, 10 months
JBoss hornetq SVN: r11089 - trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 11:27:51 -0400 (Mon, 01 Aug 2011)
New Revision: 11089
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
Log:
HORNETQ-749 AIOSequentialFile.read(,) to return correct number of bytes read
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-08-01 14:25:42 UTC (rev 11088)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-08-01 15:27:51 UTC (rev 11089)
@@ -204,9 +204,14 @@
public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
int bytesToRead = bytes.limit();
-
long positionToRead = position.getAndAdd(bytesToRead);
+ long size = size();
+ if (size < (positionToRead + bytesToRead))
+ {
+ bytesToRead = (int)(size - positionToRead);
+ }
+
bytes.rewind();
aioFile.read(positionToRead, bytesToRead, bytes, callback);
12 years, 10 months
JBoss hornetq SVN: r11088 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 10:25:42 -0400 (Mon, 01 Aug 2011)
New Revision: 11088
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-01 14:25:18 UTC (rev 11087)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-01 14:25:42 UTC (rev 11088)
@@ -1202,10 +1202,8 @@
closed = true;
}
- public synchronized void notifyNodeDown(final String nodeID)
+ public void notifyNodeDown(final String nodeID)
{
- boolean removed = false;
-
if (!clusterConnection && !ha)
{
if (log.isDebugEnabled())
@@ -1220,7 +1218,7 @@
log.debug("XXX ZZZ nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
- removed = topology.removeMember(nodeID);
+ topology.removeMember(nodeID);
if (!topology.isEmpty())
{
@@ -1240,7 +1238,7 @@
}
- public synchronized void notifyNodeUp(final String nodeID,
+ public void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1280,13 +1278,16 @@
updateArraysAndPairs();
}
- if (last)
+ synchronized (this)
{
- receivedTopology = true;
+ if (last)
+ {
+ receivedTopology = true;
+ }
+
+ // Notify if waiting on getting topology
+ notifyAll();
}
-
- // Notify if waiting on getting topology
- notify();
}
/* (non-Javadoc)
@@ -1313,7 +1314,7 @@
}
}
- private void updateArraysAndPairs()
+ private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
12 years, 10 months
JBoss hornetq SVN: r11087 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 10:25:18 -0400 (Mon, 01 Aug 2011)
New Revision: 11087
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
Log:
tweak for debug test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2011-08-01 13:51:31 UTC (rev 11086)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2011-08-01 14:25:18 UTC (rev 11087)
@@ -58,38 +58,47 @@
{
return false;
}
-
- public void testBasicRoundRobin() throws Exception
+
+ public void testBasicRoundRobin() throws Throwable
{
- setupCluster();
-
- startServers(0, 1, 2, 3, 4, 5);
-
- setupSessionFactory(3, isNetty());
- setupSessionFactory(4, isNetty());
- setupSessionFactory(5, isNetty());
-
- createQueue(3, "queues.testaddress", "queue0", null, false);
- createQueue(4, "queues.testaddress", "queue0", null, false);
- createQueue(5, "queues.testaddress", "queue0", null, false);
-
- addConsumer(0, 3, "queue0", null);
- addConsumer(1, 4, "queue0", null);
- addConsumer(2, 5, "queue0", null);
-
- waitForBindings(3, "queues.testaddress", 1, 1, true);
- waitForBindings(4, "queues.testaddress", 1, 1, true);
- waitForBindings(5, "queues.testaddress", 1, 1, true);
-
- waitForBindings(3, "queues.testaddress", 2, 2, false);
- waitForBindings(4, "queues.testaddress", 2, 2, false);
- waitForBindings(5, "queues.testaddress", 2, 2, false);
-
- send(3, "queues.testaddress", 100, false, null);
-
- verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
-
- verifyNotReceive(0, 0, 1, 2);
+ try
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4, 5);
+
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+ setupSessionFactory(5, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+ createQueue(5, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 3, "queue0", null);
+ addConsumer(1, 4, "queue0", null);
+ addConsumer(2, 5, "queue0", null);
+
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+ waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+ send(3, "queues.testaddress", 100, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+ verifyNotReceive(0, 0, 1, 2);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ log.error(e.getMessage(), e);
+ throw e;
+ }
}
protected void setupCluster() throws Exception
12 years, 10 months