JBoss hornetq SVN: r8145 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-26 22:24:40 -0400 (Mon, 26 Oct 2009)
New Revision: 8145
Added:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Backup of my current changes to the branch
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -85,6 +85,11 @@
int getAlignment() throws Exception;
void perfBlast(int pages) throws Exception;
+
+ /** Read the entire content of the journal and copy it to another Journal */
+ void copyTo(Journal destJournal) throws Exception;
+ /** This method will flush everything and make a hard sync on the journal. Use it with caution. (on tests mainly) */
+ void flush() throws Exception;
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -61,8 +61,7 @@
*/
void createDirs() throws Exception;
- // used on tests only
- void testFlush();
+ void flushBuffers();
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -35,8 +35,6 @@
String debug() throws Exception;
- void debugWait() throws Exception;
-
int getFileSize();
int getMinFiles();
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -102,7 +102,7 @@
}
}
- public void testFlush()
+ public void flushBuffers()
{
timedBuffer.flush();
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -64,7 +64,7 @@
{
}
- public void testFlush()
+ public void flushBuffers()
{
}
Added: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java (rev 0)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Set;
+
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A JournalCopier
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCopier extends AbstractJournalUpdateTask
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JournalCopier.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final Set<Long> pendingTransactions;
+
+ private final Journal journalTo;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * @param fileFactory
+ * @param journal
+ * @param recordsSnapshot
+ * @param nextOrderingID
+ */
+ public JournalCopier(SequentialFileFactory fileFactory,
+ JournalImpl journalFrom,
+ Journal journalTo,
+ Set<Long> recordsSnapshot,
+ Set<Long> pendingTransactionsSnapshot)
+ {
+ super(fileFactory, journalFrom, recordsSnapshot, -1);
+ this.pendingTransactions = pendingTransactionsSnapshot;
+ this.journalTo = journalTo;
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
+ */
+
+ public void onReadAddRecord(final RecordInfo info) throws Exception
+ {
+ if (lookupRecord(info.id))
+ {
+ journalTo.appendAddRecord(info.id, info.userRecordType, info.data, false);
+ }
+ }
+
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
+ {
+ if (pendingTransactions.contains(transactionID))
+ {
+ journalTo.appendAddRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
+ }
+ else
+ {
+ // Will try it as a regular record, the method addRecord will validate if this is a live record or not
+ onReadAddRecord(info);
+ }
+ }
+
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+ {
+
+ if (pendingTransactions.contains(transactionID))
+ {
+ // Sanity check, this should never happen
+ log.warn("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+ " for an already committed transaction during compacting");
+ }
+ }
+
+ public void onReadDeleteRecord(final long recordID) throws Exception
+ {
+ // Nothing to be done here, we don't copy deleted records
+ }
+
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
+ {
+ if (pendingTransactions.contains(transactionID))
+ {
+ journalTo.appendDeleteRecordTransactional(transactionID, info.id, info.data);
+ }
+ // else.. nothing to be done
+ }
+
+ public void markAsDataFile(final JournalFile file)
+ {
+ // nothing to be done here
+ }
+
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+ {
+ if (pendingTransactions.contains(transactionID))
+ {
+ journalTo.appendPrepareRecord(transactionID, extraData, false);
+ }
+ }
+
+ public void onReadRollbackRecord(final long transactionID) throws Exception
+ {
+ if (pendingTransactions.contains(transactionID))
+ {
+ // Sanity check, this should never happen
+ log.warn("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+ " for an already rolled back transaction during compacting");
+ }
+ }
+
+ public void onReadUpdateRecord(final RecordInfo info) throws Exception
+ {
+ if (lookupRecord(info.id))
+ {
+ journalTo.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+ }
+ }
+
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
+ {
+ if (pendingTransactions.contains(transactionID))
+ {
+ journalTo.appendUpdateRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
+ }
+ else
+ {
+ onReadUpdateRecord(info);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -44,6 +44,7 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -173,7 +174,7 @@
private final float compactPercentage;
- private final int compactMinFiles;
+ private volatile int compactMinFiles;
private final SequentialFileFactory fileFactory;
@@ -209,8 +210,8 @@
// After a record is appended, the usedFile can't be changed until the positives and negatives are updated
private final ReentrantLock lockAppend = new ReentrantLock();
- /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
- private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
+ /** We never lock the journal, however we need to lock it while taking and updating snapshots */
+ private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
private volatile JournalFile currentFile;
@@ -852,7 +853,7 @@
IOCallback callback = null;
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
@@ -878,7 +879,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
if (callback != null)
@@ -901,7 +902,7 @@
IOCallback callback = null;
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
@@ -947,7 +948,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
if (callback != null)
@@ -963,7 +964,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
IOCallback callback = null;
@@ -1012,7 +1013,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
if (callback != null)
@@ -1037,7 +1038,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
@@ -1064,7 +1065,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
}
@@ -1086,7 +1087,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
@@ -1113,7 +1114,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
}
@@ -1129,7 +1130,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
@@ -1155,7 +1156,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
}
@@ -1163,7 +1164,7 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
*/
@@ -1172,8 +1173,6 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
-
-
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -1194,7 +1193,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
JournalTransaction tx = getTransactionInfo(txID);
@@ -1226,7 +1225,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
// We should wait this outside of the lock, to increase throughput
@@ -1257,7 +1256,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
JournalTransaction tx = transactions.remove(txID);
@@ -1294,7 +1293,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
if (sync)
@@ -1311,7 +1310,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
JournalTransaction tx = null;
@@ -1343,7 +1342,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -1432,6 +1431,105 @@
return maxID;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+ */
+ public void copyTo(Journal journal) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ return;
+ }
+
+ int originalCompact = this.compactMinFiles;
+ boolean originalAutoReclaim = this.autoReclaim;
+
+ compactMinFiles = 0;
+ autoReclaim = false;
+
+ flushExecutor();
+
+ // Wait the compactor and cleanup to finish case they are running
+ while (!compactorRunning.compareAndSet(false, true))
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+ latch.await();
+ }
+
+ JournalCopier copier = null;
+
+ try
+ {
+
+ // begin ********************************************
+
+ List<JournalFile> dataFilesToProcess = null;
+
+ // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+ globalLock.writeLock().lock();
+
+ try
+ {
+
+ // Take the snapshots and replace the structures
+
+ dataFilesToProcess = getSnapshotFilesToProcess();
+
+ if (dataFilesToProcess.size() == 0)
+ {
+ return;
+ }
+
+ dataFiles.clear();
+
+ HashSet<Long> txSet = new HashSet<Long>();
+
+ for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+ {
+ txSet.add(entry.getKey());
+ }
+
+ copier = new JournalCopier(fileFactory, this, journal, records.keySet(), txSet);
+ }
+ finally
+ {
+ globalLock.writeLock().unlock();
+ }
+
+ Collections.sort(dataFilesToProcess, new JournalFileComparator());
+
+ // This is where most of the work is done, taking most of the time of the compacting routine.
+ // Notice there are no locks while this is being done.
+
+ // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
+ // well
+ for (final JournalFile file : dataFilesToProcess)
+ {
+ readJournalFile(fileFactory, file, copier);
+ }
+
+ compactor.flush();
+
+ }
+ finally
+ {
+ this.compactMinFiles = originalCompact;
+ this.autoReclaim = originalAutoReclaim;
+ compactorRunning.set(false);
+
+ // since we disabled Reclaiming during the copy, we will do a check on everything as soon as the backup is
+ // done
+ scheduleReclaim();
+ }
+ }
+
/**
*
* Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
@@ -1442,20 +1540,21 @@
if (compactor != null)
{
- throw new IllegalStateException("There is pending compacting operation");
+ throw new IllegalStateException("There is a pending compacting operation");
}
- ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+ List<JournalFile> dataFilesToProcess = null;
boolean previousReclaimValue = autoReclaim;
+ globalLock.readLock().lock();
+
try
{
log.debug("Starting compacting operation on journal");
- // We need to guarantee that the journal is frozen for this short time
- // We don't freeze the journal as we compact, only for the short time where we replace records
- compactingLock.writeLock().lock();
+ // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+ globalLock.writeLock().lock();
try
{
if (state != STATE_LOADED)
@@ -1465,34 +1564,25 @@
autoReclaim = false;
- // We need to move to the next file, as we need a clear start for negatives and positives counts
- moveNextFile(true);
-
// Take the snapshots and replace the structures
- dataFilesToProcess.addAll(dataFiles);
+ dataFilesToProcess = getSnapshotFilesToProcess();
- for (JournalFile file : pendingCloseFiles)
+ if (dataFilesToProcess.size() == 0)
{
- file.getFile().close();
+ return;
}
- dataFilesToProcess.addAll(pendingCloseFiles);
- pendingCloseFiles.clear();
-
dataFiles.clear();
- if (dataFilesToProcess.size() == 0)
- {
- return;
- }
+ List<Pair<Long, JournalTransaction>> pendingTransactions = getSnapshoPendingTransactions();
compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
- for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+ for (Pair<Long, JournalTransaction> tx : pendingTransactions)
{
- compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
- entry.getValue().setCompacting();
+ compactor.addPendingTransaction(tx.a, tx.b.getPositiveArray());
+ tx.b.setCompacting();
}
// We will calculate the new records during compacting, what will take the position the records will take
@@ -1501,7 +1591,7 @@
}
finally
{
- compactingLock.writeLock().unlock();
+ globalLock.writeLock().unlock();
}
Collections.sort(dataFilesToProcess, new JournalFileComparator());
@@ -1529,7 +1619,7 @@
SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
- compactingLock.writeLock().lock();
+ globalLock.writeLock().lock();
try
{
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
@@ -1584,7 +1674,7 @@
}
finally
{
- compactingLock.writeLock().unlock();
+ globalLock.writeLock().unlock();
}
// At this point the journal is unlocked. We keep renaming files while the journal is already operational
@@ -1596,6 +1686,8 @@
}
finally
{
+ globalLock.readLock().unlock();
+
// An Exception was probably thrown, and the compactor was not cleared
if (compactor != null)
{
@@ -1614,6 +1706,40 @@
}
+ /**
+ * @return
+ */
+ private List<Pair<Long, JournalTransaction>> getSnapshoPendingTransactions()
+ {
+ List<Pair<Long, JournalTransaction>> pendingTransactions = new ArrayList<Pair<Long, JournalTransaction>>();
+
+ for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+ {
+ pendingTransactions.add(new Pair<Long, JournalTransaction>(entry.getKey(), entry.getValue()));
+ }
+ return pendingTransactions;
+ }
+
+ /**
+ * Requires full lock (WriteLock)
+ * @param dataFilesToProcess
+ */
+ private List<JournalFile> getSnapshotFilesToProcess() throws Exception
+ {
+ // We need to move to the next file, as we need a clear start for negatives and positives counts
+ moveNextFile(true);
+
+ List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
+ for (JournalFile file : pendingCloseFiles)
+ {
+ this.closeFile(file, true);
+ }
+
+ dataFilesToProcess.addAll(dataFiles);
+
+ return dataFilesToProcess;
+ }
+
/**
* <p>Load data accordingly to the record layouts</p>
*
@@ -2034,8 +2160,10 @@
// Remove the transactionInfo
transactions.remove(transaction.transactionID);
-
- loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
+
+ loadManager.failedTransaction(transaction.transactionID,
+ transaction.recordInfos,
+ transaction.recordsToDelete);
}
else
{
@@ -2062,7 +2190,7 @@
public boolean checkReclaimStatus() throws Exception
{
// We can't start reclaim while compacting is working
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
reclaimer.scan(getDataFiles());
@@ -2144,7 +2272,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
return false;
@@ -2165,12 +2293,15 @@
return;
}
- compactingLock.readLock().lock();
+ // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+ globalLock.readLock().lock();
try
{
JournalCleaner cleaner = null;
ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
+
+ // getting the lockAppend as the counters are being changed
lockAppend.lock();
try
@@ -2229,7 +2360,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
log.debug("Clean up on file " + file + " done");
}
@@ -2263,7 +2394,7 @@
return;
}
- // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+ // We can't use the main executor for the compacting... or we would dead lock because of file open and creation
// operations (that will use the executor)
compactorExecutor.execute(new Runnable()
{
@@ -2347,21 +2478,25 @@
return builder.toString();
}
- /** Method for use on testcases.
- * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
- public void debugWait() throws Exception
+ public void flush() throws Exception
{
- fileFactory.testFlush();
+ fileFactory.flushBuffers();
for (JournalTransaction tx : transactions.values())
{
tx.waitCallbacks();
}
+ flushExecutor();
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ private void flushExecutor() throws InterruptedException
+ {
if (filesExecutor != null && !filesExecutor.isShutdown())
{
- // Send something to the closingExecutor, just to make sure we went
- // until its end
final CountDownLatch latch = new CountDownLatch(1);
filesExecutor.execute(new Runnable()
@@ -2374,7 +2509,6 @@
latch.await();
}
-
}
public int getDataFilesCount()
@@ -2430,7 +2564,7 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
lockAppend.lock();
@@ -2441,7 +2575,7 @@
{
checkReclaimStatus();
}
- debugWait();
+ flush();
}
finally
{
@@ -2450,7 +2584,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
}
@@ -2854,7 +2988,7 @@
currentFile.getFile().write(bb, sync);
}
- return currentFile;
+ return currentFile;
}
finally
{
@@ -3115,7 +3249,7 @@
{
public void run()
{
- compactingLock.readLock().lock();
+ globalLock.readLock().lock();
try
{
// The file could be closed by compacting. On this case we need to check if the close still pending
@@ -3135,7 +3269,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ globalLock.readLock().unlock();
}
}
};
@@ -3355,7 +3489,7 @@
{
private static NullEncoding instance = new NullEncoding();
-
+
public static NullEncoding getInstance()
{
return instance;
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -397,6 +397,23 @@
return localJournal.isStarted();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+ */
+ public void copyTo(Journal destJournal) throws Exception
+ {
+ // This would be a nonsense operation. Only the real journal can copyTo
+ throw new IllegalStateException("Operation Not Implemeted!");
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#flush()
+ */
+ public void flush() throws Exception
+ {
+ localJournal.flush();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -16,6 +16,8 @@
import java.io.File;
+import junit.framework.TestSuite;
+
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
@@ -30,6 +32,13 @@
public class AIOJournalCompactTest extends NIOJournalCompactTest
{
+
+ public static TestSuite suite()
+ {
+ return createAIOTestSuite(AIOJournalCompactTest.class);
+ }
+
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -16,7 +16,8 @@
import java.io.File;
import java.nio.ByteBuffer;
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import junit.framework.TestSuite;
+
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
@@ -31,20 +32,17 @@
*/
public class AIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
{
-
+
+ public static TestSuite suite()
+ {
+ return createAIOTestSuite(AIOSequentialFileFactoryTest.class);
+ }
+
@Override
protected void setUp() throws Exception
{
super.setUp();
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
-
File file = new File(getTestDir());
deleteDirectory(file);
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -15,6 +15,8 @@
import java.io.File;
+import junit.framework.TestSuite;
+
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -38,6 +40,12 @@
public class RealAIOJournalImplTest extends JournalImplTestUnit
{
private static final Logger log = Logger.getLogger(RealAIOJournalImplTest.class);
+
+ public static TestSuite suite()
+ {
+ // Ignore tests if AIO is not installed
+ return createAIOTestSuite(RealAIOJournalImplTest.class);
+ }
@Override
protected void setUp() throws Exception
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -699,5 +699,19 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+ */
+ public void copyTo(Journal destJournal) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#flush()
+ */
+ public void flush() throws Exception
+ {
+ }
+
}
}
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -236,7 +236,7 @@
if (transactionSize == 0)
{
- journal.debugWait();
+ journal.flush();
}
}
catch (Exception e)
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -280,7 +280,7 @@
journalImpl.checkReclaimStatus();
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(2, factory.listFiles("tt").size());
@@ -297,7 +297,7 @@
// as the request to a new file is asynchronous, we need to make sure the
// async requests are done
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(3, factory.listFiles("tt").size());
@@ -306,7 +306,7 @@
journalImpl.appendDeleteRecord(i, false);
}
- journalImpl.debugWait();
+ journalImpl.flush();
setupAndLoadJournal(JOURNAL_SIZE, 100);
@@ -326,7 +326,7 @@
journalImpl.checkReclaimStatus();
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(2, factory.listFiles("tt").size());
@@ -341,7 +341,7 @@
// as the request to a new file is asynchronous, we need to make sure the
// async requests are done
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(2, factory.listFiles("tt").size());
@@ -354,7 +354,7 @@
journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(3, factory.listFiles("tt").size());
@@ -368,7 +368,7 @@
log.debug(journalImpl.debug());
- journalImpl.debugWait();
+ journalImpl.flush();
log.debug("Final:--> " + journalImpl.debug());
@@ -432,7 +432,7 @@
journalImpl.forceMoveNextFile();
}
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(12, factory.listFiles("tt").size());
@@ -486,7 +486,7 @@
journalImpl.appendCommitRecord(1l, false);
- journalImpl.debugWait();
+ journalImpl.flush();
assertEquals(12, factory.listFiles("tt").size());
@@ -542,7 +542,7 @@
journalImpl.appendCommitRecord(1l, false);
- journalImpl.debugWait();
+ journalImpl.flush();
setupAndLoadJournal(JOURNAL_SIZE, 100);
@@ -872,7 +872,7 @@
journalImpl.appendPrepareRecord(1, xid, false);
- journalImpl.debugWait();
+ journalImpl.flush();
setupAndLoadJournal(JOURNAL_SIZE, 1);
@@ -899,7 +899,7 @@
journalImpl.appendCommitRecord(1l, false);
- journalImpl.debugWait();
+ journalImpl.flush();
setupAndLoadJournal(JOURNAL_SIZE, 1);
@@ -923,7 +923,7 @@
journalImpl.forceMoveNextFile();
}
- journalImpl.debugWait();
+ journalImpl.flush();
SimpleEncoding xid1 = new SimpleEncoding(10, (byte)1);
@@ -1157,7 +1157,7 @@
setupAndLoadJournal(JOURNAL_SIZE, 0);
journalImpl.forceMoveNextFile();
- journalImpl.debugWait();
+ journalImpl.flush();
journalImpl.checkReclaimStatus();
assertEquals(0, transactions.size());
@@ -1243,11 +1243,11 @@
assertEquals(2, finishedOK.intValue());
- journalImpl.debugWait();
+ journalImpl.flush();
journalImpl.forceMoveNextFile();
- journalImpl.debugWait();
+ journalImpl.flush();
journalImpl.checkReclaimStatus();
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -103,9 +103,9 @@
protected void checkAndReclaimFiles() throws Exception
{
- journal.debugWait();
+ journal.flush();
journal.checkReclaimStatus();
- journal.debugWait();
+ journal.flush();
}
protected abstract SequentialFileFactory getFileFactory() throws Exception;
@@ -218,7 +218,7 @@
records.add(new RecordInfo(element, (byte)0, record, false));
}
- journal.debugWait();
+ journal.flush();
}
protected void update(final long... arguments) throws Exception
@@ -232,7 +232,7 @@
records.add(new RecordInfo(element, (byte)0, updateRecord, true));
}
- journal.debugWait();
+ journal.flush();
}
protected void delete(final long... arguments) throws Exception
@@ -244,7 +244,7 @@
removeRecordsForID(element);
}
- journal.debugWait();
+ journal.flush();
}
protected void addTx(final long txID, final long... arguments) throws Exception
@@ -263,7 +263,7 @@
}
- journal.debugWait();
+ journal.flush();
}
protected void updateTx(final long txID, final long... arguments) throws Exception
@@ -278,7 +278,7 @@
tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
}
- journal.debugWait();
+ journal.flush();
}
protected void deleteTx(final long txID, final long... arguments) throws Exception
@@ -292,7 +292,7 @@
tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
}
- journal.debugWait();
+ journal.flush();
}
protected void prepare(final long txID, final EncodingSupport xid) throws Exception
@@ -313,7 +313,7 @@
tx.prepared = true;
- journal.debugWait();
+ journal.flush();
}
protected void commit(final long txID) throws Exception
@@ -329,7 +329,7 @@
commitTx(txID);
- journal.debugWait();
+ journal.flush();
}
protected void rollback(final long txID) throws Exception
@@ -343,7 +343,7 @@
journal.appendRollbackRecord(txID, sync);
- journal.debugWait();
+ journal.flush();
}
private void commitTx(final long txID)
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -985,7 +985,7 @@
deleteTx(1, 1); // in file 1
- journal.debugWait();
+ journal.flush();
System.out.println("journal tmp :" + journal.debug());
@@ -1002,7 +1002,7 @@
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
- journal.debugWait();
+ journal.flush();
System.out.println("journal tmp2 :" + journal.debug());
@@ -1875,7 +1875,7 @@
journal.forceMoveNextFile();
- journal.debugWait();
+ journal.flush();
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -693,7 +693,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFileFactory#testFlush()
*/
- public void testFlush()
+ public void flushBuffers()
{
}
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-10-27 02:24:40 UTC (rev 8145)
@@ -41,12 +41,14 @@
import javax.transaction.xa.Xid;
import junit.framework.TestCase;
+import junit.framework.TestSuite;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -189,6 +191,23 @@
return str.toString();
}
+ protected static TestSuite createAIOTestSuite(Class<?> clazz)
+ {
+ TestSuite suite = new TestSuite(clazz.getName() + " testsuite");
+
+ if (AIOSequentialFileFactory.isSupported())
+ {
+ suite.addTestSuite(clazz);
+ }
+ else
+ {
+ // System.out goes towards JUnit report
+ System.out.println("Test " + clazz.getName() + " ignored as AIO is not available");
+ }
+
+ return suite;
+ }
+
public static String dumpBytes(byte[] bytes)
{
StringBuffer buff = new StringBuffer();
15 years, 2 months
JBoss hornetq SVN: r8144 - trunk/src/main/org/hornetq/core/settings/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-26 21:52:00 -0400 (Mon, 26 Oct 2009)
New Revision: 8144
Modified:
trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
Log:
changing maxSize to long
Modified: trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-10-26 22:16:17 UTC (rev 8143)
+++ trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-10-27 01:52:00 UTC (rev 8144)
@@ -52,7 +52,7 @@
public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;
- private Integer maxSizeBytes = null;
+ private Long maxSizeBytes = null;
private Integer pageSizeBytes = null;
@@ -106,12 +106,12 @@
pageSizeBytes = pageSize;
}
- public int getMaxSizeBytes()
+ public long getMaxSizeBytes()
{
return maxSizeBytes != null ? maxSizeBytes : DEFAULT_MAX_SIZE_BYTES;
}
- public void setMaxSizeBytes(final int maxSizeBytes)
+ public void setMaxSizeBytes(final long maxSizeBytes)
{
this.maxSizeBytes = maxSizeBytes;
}
15 years, 2 months
JBoss hornetq SVN: r8143 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-26 18:16:17 -0400 (Mon, 26 Oct 2009)
New Revision: 8143
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
Log:
Removing file.open left by accident
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-26 09:04:15 UTC (rev 8142)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-26 22:16:17 UTC (rev 8143)
@@ -287,6 +287,7 @@
@Override
public synchronized ServerMessage copy(final long newID) throws Exception
{
+ // Incrementing the reference counting to avoid deletion of the linkedMessage
incrementRefCount();
long idToUse = messageID;
@@ -298,8 +299,6 @@
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, isStored());
- file.open();
-
JournalLargeServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ? this : (JournalLargeServerMessage)linkMessage, newfile, newID);
return newMessage;
15 years, 2 months
JBoss hornetq SVN: r8142 - in trunk: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-26 05:04:15 -0400 (Mon, 26 Oct 2009)
New Revision: 8142
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
clear ClientConsumer when failover occurs
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-23 22:31:39 UTC (rev 8141)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-26 09:04:15 UTC (rev 8142)
@@ -881,7 +881,6 @@
if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
// Session is transacted - set for rollback only
-
// FIXME - there is a race condition here - a commit could sneak in before this is set
rollbackOnly = true;
}
@@ -891,6 +890,7 @@
{
for (ClientConsumerInternal consumer : consumers.values())
{
+ consumer.clearAtFailover();
consumer.start();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-23 22:31:39 UTC (rev 8141)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-26 09:04:15 UTC (rev 8142)
@@ -271,6 +271,10 @@
session.addFailureListener(new MyListener());
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
ClientProducer producer = session.createProducer(ADDRESS);
final int numMessages = 100;
@@ -294,10 +298,7 @@
session.commit();
- ClientConsumer consumer = session.createConsumer(ADDRESS);
- session.start();
-
for (int i = 0; i < numMessages; i++)
{
// Only the persistent messages will survive
@@ -316,6 +317,8 @@
}
}
+ assertNull(consumer.receive(1000));
+
session.commit();
session.close();
@@ -488,6 +491,8 @@
session2.commit();
+ assertNull(consumer.receive(1000));
+
session1.close();
session2.close();
15 years, 2 months
JBoss hornetq SVN: r8141 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-23 18:31:39 -0400 (Fri, 23 Oct 2009)
New Revision: 8141
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
Log:
This test was supposed to use asynchronous on the SessionFactory
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-10-23 15:33:51 UTC (rev 8140)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-10-23 22:31:39 UTC (rev 8141)
@@ -130,6 +130,8 @@
private void addEmptyTransaction(Xid xid) throws HornetQException, XAException
{
ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
ClientSession session = sf.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
@@ -141,6 +143,8 @@
private void checkEmptyXID(Xid xid) throws HornetQException, XAException
{
ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
ClientSession session = sf.createSession(true, false, false);
Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
@@ -343,7 +347,9 @@
server.start();
sf = createNettyFactory();
-
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+
ClientSession sess = sf.createSession();
try
15 years, 3 months
JBoss hornetq SVN: r8140 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-23 11:33:51 -0400 (Fri, 23 Oct 2009)
New Revision: 8140
Added:
branches/Clebert_Sync/
Log:
Creating a temporary work area as I want to have access to some history during a refactory. (If I were using git I wouldnt need this branch now)
Copied: branches/Clebert_Sync (from rev 8139, trunk)
15 years, 3 months
JBoss hornetq SVN: r8139 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-22 13:42:57 -0400 (Thu, 22 Oct 2009)
New Revision: 8139
Removed:
branches/Replication_Clebert/
Log:
Removing temporary branch after merge
15 years, 3 months
JBoss hornetq SVN: r8138 - trunk/examples/common.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-22 10:57:03 -0400 (Thu, 22 Oct 2009)
New Revision: 8138
Added:
trunk/examples/common/build.bat
trunk/examples/common/build.sh
Log:
added missing build scripts
Added: trunk/examples/common/build.bat
===================================================================
--- trunk/examples/common/build.bat (rev 0)
+++ trunk/examples/common/build.bat 2009-10-22 14:57:03 UTC (rev 8138)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\tools\ant"
+
+if exist "..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/common/build.sh
===================================================================
--- trunk/examples/common/build.sh (rev 0)
+++ trunk/examples/common/build.sh 2009-10-22 14:57:03 UTC (rev 8138)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: trunk/examples/common/build.sh
___________________________________________________________________
Name: svn:executable
+ *
15 years, 3 months
JBoss hornetq SVN: r8137 - trunk/src/main/org/hornetq/core/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-22 10:37:03 -0400 (Thu, 22 Oct 2009)
New Revision: 8137
Modified:
trunk/src/main/org/hornetq/core/management/Operation.java
Log:
@Operation annotation javadoc
* used only for operations invokable from a GUI
Modified: trunk/src/main/org/hornetq/core/management/Operation.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Operation.java 2009-10-22 13:20:02 UTC (rev 8136)
+++ trunk/src/main/org/hornetq/core/management/Operation.java 2009-10-22 14:37:03 UTC (rev 8137)
@@ -24,6 +24,9 @@
/**
* Info for a MBean Operation.
*
+ * This annotation is used only for methods which can be invoked
+ * through a GUI.
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*
* @version <tt>$Revision$</tt>
15 years, 3 months
JBoss hornetq SVN: r8136 - trunk/tests/src/org/hornetq/tests/integration.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-22 09:20:02 -0400 (Thu, 22 Oct 2009)
New Revision: 8136
Added:
trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
Log:
added interceptor test
Added: trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2009-10-22 13:20:02 UTC (rev 8136)
@@ -0,0 +1,605 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * A InterceptorTest
+ *
+ * @author tim fox
+ *
+ *
+ */
+public class InterceptorTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(InterceptorTest.class);
+
+ private HornetQServer server;
+
+ private final SimpleString QUEUE = new SimpleString("InterceptorTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ private static final String key = "fruit";
+
+ private class MyInterceptor1 implements Interceptor
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ SessionSendMessage p = (SessionSendMessage)packet;
+
+ ServerMessage sm = p.getServerMessage();
+
+ sm.putStringProperty(key, "orange");
+ }
+
+ return true;
+ }
+
+ }
+
+ private class MyInterceptor2 implements Interceptor
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ }
+
+ private class MyInterceptor3 implements Interceptor
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage p = (SessionReceiveMessage)packet;
+
+ ClientMessage cm = p.getClientMessage();
+
+ cm.putStringProperty(key, "orange");
+ }
+
+ return true;
+ }
+
+ }
+
+ private class MyInterceptor4 implements Interceptor
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ }
+
+ private class MyInterceptor5 implements Interceptor
+ {
+ private final String key;
+
+ private final int num;
+
+ private volatile boolean reject;
+
+ private volatile boolean wasCalled;
+
+ MyInterceptor5(final String key, final int num)
+ {
+ this.key = key;
+
+ this.num = num;
+ }
+
+ public void setReject(final boolean reject)
+ {
+ this.reject = reject;
+ }
+
+ public boolean wasCalled()
+ {
+ return wasCalled;
+ }
+
+ public void setWasCalled(final boolean wasCalled)
+ {
+ this.wasCalled = wasCalled;
+ }
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ SessionSendMessage p = (SessionSendMessage)packet;
+
+ ServerMessage sm = p.getServerMessage();
+
+ sm.putIntProperty(key, num);
+
+ wasCalled = true;
+
+ return !reject;
+ }
+
+ return true;
+
+ }
+
+ }
+
+ private class MyInterceptor6 implements Interceptor
+ {
+ private final String key;
+
+ private final int num;
+
+ private volatile boolean reject;
+
+ private volatile boolean wasCalled;
+
+ MyInterceptor6(final String key, final int num)
+ {
+ this.key = key;
+
+ this.num = num;
+ }
+
+ public void setReject(final boolean reject)
+ {
+ this.reject = reject;
+ }
+
+ public boolean wasCalled()
+ {
+ return wasCalled;
+ }
+
+ public void setWasCalled(final boolean wasCalled)
+ {
+ this.wasCalled = wasCalled;
+ }
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage p = (SessionReceiveMessage)packet;
+
+ Message sm = p.getClientMessage();
+
+ sm.putIntProperty(key, num);
+
+ wasCalled = true;
+
+ return !reject;
+ }
+
+ return true;
+
+ }
+
+ }
+
+ public void testServerInterceptorChangeProperty() throws Exception
+ {
+ MyInterceptor1 interceptor = new MyInterceptor1();
+
+ server.getRemotingService().addInterceptor(interceptor);
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(key, "apple");
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals("orange", ((SimpleString)message.getProperty(key)).toString());
+ }
+
+ server.getRemotingService().removeInterceptor(interceptor);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(key, "apple");
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals("apple", ((SimpleString)message.getProperty(key)).toString());
+ }
+
+ session.close();
+ }
+
+ public void testServerInterceptorRejectPacket() throws Exception
+ {
+ MyInterceptor2 interceptor = new MyInterceptor2();
+
+ server.getRemotingService().addInterceptor(interceptor);
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(false);
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ ClientMessage message = consumer.receive(250);
+
+ assertNull(message);
+
+ session.close();
+ }
+
+ public void testClientInterceptorChangeProperty() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ MyInterceptor3 interceptor = new MyInterceptor3();
+
+ sf.addInterceptor(interceptor);
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(key, "apple");
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals("orange", ((SimpleString)message.getProperty(key)).toString());
+ }
+
+ sf.removeInterceptor(interceptor);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(key, "apple");
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals("apple", ((SimpleString)message.getProperty(key)).toString());
+ }
+
+ session.close();
+ }
+
+ public void testClientInterceptorRejectPacket() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ MyInterceptor4 interceptor = new MyInterceptor4();
+
+ sf.addInterceptor(interceptor);
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ ClientMessage message = consumer.receive(250);
+
+ assertNull(message);
+
+ session.close();
+ }
+
+ public void testServerMultipleInterceptors() throws Exception
+ {
+ MyInterceptor5 interceptor1 = new MyInterceptor5("a", 1);
+ MyInterceptor5 interceptor2 = new MyInterceptor5("b", 2);
+ MyInterceptor5 interceptor3 = new MyInterceptor5("c", 3);
+ MyInterceptor5 interceptor4 = new MyInterceptor5("d", 4);
+
+ server.getRemotingService().addInterceptor(interceptor1);
+ server.getRemotingService().addInterceptor(interceptor2);
+ server.getRemotingService().addInterceptor(interceptor3);
+ server.getRemotingService().addInterceptor(interceptor4);
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals(1, ((Integer)message.getProperty("a")).intValue());
+ assertEquals(2, ((Integer)message.getProperty("b")).intValue());
+ assertEquals(3, ((Integer)message.getProperty("c")).intValue());
+ assertEquals(4, ((Integer)message.getProperty("d")).intValue());
+ }
+
+ server.getRemotingService().removeInterceptor(interceptor2);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals(1, ((Integer)message.getProperty("a")).intValue());
+ assertNull(message.getProperty("b"));
+ assertEquals(3, ((Integer)message.getProperty("c")).intValue());
+ assertEquals(4, ((Integer)message.getProperty("d")).intValue());
+
+ }
+
+ interceptor3.setReject(true);
+
+ interceptor1.setWasCalled(false);
+ interceptor2.setWasCalled(false);
+ interceptor3.setWasCalled(false);
+ interceptor4.setWasCalled(false);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientMessage message = consumer.receive(250);
+
+ assertNull(message);
+
+ assertTrue(interceptor1.wasCalled());
+ assertFalse(interceptor2.wasCalled());
+ assertTrue(interceptor3.wasCalled());
+ assertFalse(interceptor4.wasCalled());
+
+ session.close();
+ }
+
+ public void testClientMultipleInterceptors() throws Exception
+ {
+ MyInterceptor6 interceptor1 = new MyInterceptor6("a", 1);
+ MyInterceptor6 interceptor2 = new MyInterceptor6("b", 2);
+ MyInterceptor6 interceptor3 = new MyInterceptor6("c", 3);
+ MyInterceptor6 interceptor4 = new MyInterceptor6("d", 4);
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.addInterceptor(interceptor1);
+ sf.addInterceptor(interceptor2);
+ sf.addInterceptor(interceptor3);
+ sf.addInterceptor(interceptor4);
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals(1, ((Integer)message.getProperty("a")).intValue());
+ assertEquals(2, ((Integer)message.getProperty("b")).intValue());
+ assertEquals(3, ((Integer)message.getProperty("c")).intValue());
+ assertEquals(4, ((Integer)message.getProperty("d")).intValue());
+ }
+
+ sf.removeInterceptor(interceptor2);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertEquals(1, ((Integer)message.getProperty("a")).intValue());
+ assertNull(message.getProperty("b"));
+ assertEquals(3, ((Integer)message.getProperty("c")).intValue());
+ assertEquals(4, ((Integer)message.getProperty("d")).intValue());
+
+ }
+
+ interceptor3.setReject(true);
+
+ interceptor1.setWasCalled(false);
+ interceptor2.setWasCalled(false);
+ interceptor3.setWasCalled(false);
+ interceptor4.setWasCalled(false);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ producer.send(message);
+ }
+
+ ClientMessage message = consumer.receive(250);
+
+ assertNull(message);
+
+ assertTrue(interceptor1.wasCalled());
+ assertFalse(interceptor2.wasCalled());
+ assertTrue(interceptor3.wasCalled());
+ assertFalse(interceptor4.wasCalled());
+
+ session.close();
+ }
+
+
+}
15 years, 3 months