[jboss-cvs] JBoss Messaging SVN: r7499 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 29 21:13:00 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-29 21:12:59 -0400 (Mon, 29 Jun 2009)
New Revision: 7499
Added:
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Removed:
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
changes
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -39,7 +39,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.utils.ConcurrentHashSet;
import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
/**
* A JournalCompactor
@@ -69,7 +68,7 @@
final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
- final Map<Long, JournalRecord> recordsSnapshot;
+ final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
// Snapshot of transactions that were pending when the compactor started
final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
@@ -83,15 +82,16 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- Map<Long, JournalRecord> recordsSnapshot,
+ Set<Long> recordsSnapshot,
int firstFileID)
{
this.fileFactory = fileFactory;
this.journal = journal;
- this.recordsSnapshot = recordsSnapshot;
+ this.recordsSnapshot.addAll(recordsSnapshot);
this.nextOrderingID = firstFileID;
}
+ /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
public void addPendingTransaction(long transactionID)
{
pendingTransactions.add(transactionID);
@@ -117,7 +117,7 @@
public boolean lookupRecord(long id)
{
- return recordsSnapshot.get(id) != null;
+ return recordsSnapshot.contains(id);
}
private void checkSize(int size) throws Exception
@@ -173,7 +173,7 @@
public void addRecord(RecordInfo info) throws Exception
{
- if (recordsSnapshot.get(info.id) != null)
+ if (recordsSnapshot.contains(info.id))
{
int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
@@ -301,7 +301,7 @@
public void updateRecord(RecordInfo info) throws Exception
{
- if (recordsSnapshot.get(info.id) != null)
+ if (recordsSnapshot.contains(info.id))
{
int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
@@ -388,12 +388,12 @@
channelWrapper.writeInt(fileID);
}
- static abstract class CompactCommand
+ private static abstract class CompactCommand
{
abstract void execute() throws Exception;
}
- class DeleteCompactCommand extends CompactCommand
+ private class DeleteCompactCommand extends CompactCommand
{
long id;
@@ -414,7 +414,7 @@
}
}
- class UpdateCompactCommand extends CompactCommand
+ private class UpdateCompactCommand extends CompactCommand
{
long id;
@@ -427,6 +427,7 @@
this.usedFile = usedFile;
}
+
@Override
void execute() throws Exception
{
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -61,6 +61,7 @@
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.ConcurrentHashSet;
import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
@@ -186,10 +187,10 @@
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
// Compacting may replace this structure
- private volatile ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
+ private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
// Compacting may replace this structure
- private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
+ private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
@@ -688,7 +689,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = pendingTransactions.remove(txID);
+ JournalTransaction tx = transactions.remove(txID);
compactingLock.readLock().lock();
@@ -742,7 +743,7 @@
try
{
- tx = pendingTransactions.remove(txID);
+ tx = transactions.remove(txID);
if (tx == null)
{
@@ -836,12 +837,9 @@
throw new IllegalStateException("There is pending compacting operation");
}
- ConcurrentMap<Long, JournalRecord> recordsSnapshot = null;
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
- Map<Long, JournalTransaction> pendingTransactions;
-
boolean previousReclaimValue = autoReclaim;
try
@@ -852,32 +850,29 @@
compactingLock.writeLock().lock();
try
{
+
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile();
autoReclaim = false;
// Take the snapshots and replace the structures
-
- recordsSnapshot = JournalImpl.this.records;
- pendingTransactions = JournalImpl.this.pendingTransactions;
- pendingTransactions.putAll(this.pendingTransactions);
-
- this.records = new ConcurrentHashMap<Long, JournalRecord>();
-
+
dataFilesToProcess.addAll(dataFiles);
dataFiles.clear();
- this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, dataFilesToProcess.get(0).getFileID());
+ this.compactor = new JournalCompactor(fileFactory, this, this.records.keySet(), dataFilesToProcess.get(0).getFileID());
- for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+ for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
System.out.println("TransactionID = " + entry.getKey());
entry.getValue().setCompacting();
compactor.addPendingTransaction(entry.getKey());
}
+ // We will calculate the new records during compacting, what will take the position the records will take after compacting
+ this.records.clear();
}
finally
{
@@ -930,13 +925,11 @@
dataFiles.addFirst(newDatafiles.get(i));
}
- // Replay pending commands
+ // Replay pending commands (including updates, deletes and commits)
compactor.replayPendingCommands();
- // Restore relationshipMap
// Deal with transactions commits that happend during the compacting
- // Deal with updates and deletes that happened during the compacting
this.compactor = null;
@@ -1111,7 +1104,7 @@
throw new IllegalStateException("Journal must be in started state");
}
- final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
@@ -1196,24 +1189,24 @@
hasData.set(true);
- TransactionHolder tx = transactions.get(transactionID);
+ TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null)
{
tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ loadTransactions.put(transactionID, tx);
}
tx.recordInfos.add(info);
- JournalTransaction tnp = pendingTransactions.get(transactionID);
+ JournalTransaction tnp = transactions.get(transactionID);
if (tnp == null)
{
tnp = new JournalTransaction(JournalImpl.this);
- pendingTransactions.put(transactionID, tnp);
+ transactions.put(transactionID, tnp);
}
tnp.addPositive(file, info.id);
@@ -1228,24 +1221,24 @@
hasData.set(true);
- TransactionHolder tx = transactions.get(transactionID);
+ TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null)
{
tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ loadTransactions.put(transactionID, tx);
}
tx.recordsToDelete.add(info);
- JournalTransaction tnp = pendingTransactions.get(transactionID);
+ JournalTransaction tnp = transactions.get(transactionID);
if (tnp == null)
{
tnp = new JournalTransaction(JournalImpl.this);
- pendingTransactions.put(transactionID, tnp);
+ transactions.put(transactionID, tnp);
}
tnp.addNegative(file, info.id);
@@ -1261,27 +1254,27 @@
hasData.set(true);
- TransactionHolder tx = transactions.get(transactionID);
+ TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null)
{
// The user could choose to prepare empty transactions
tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ loadTransactions.put(transactionID, tx);
}
tx.prepared = true;
tx.extraData = extraData;
- JournalTransaction journalTransaction = pendingTransactions.get(transactionID);
+ JournalTransaction journalTransaction = transactions.get(transactionID);
if (journalTransaction == null)
{
journalTransaction = new JournalTransaction(JournalImpl.this);
- pendingTransactions.put(transactionID, journalTransaction);
+ transactions.put(transactionID, journalTransaction);
}
boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
@@ -1304,7 +1297,7 @@
trace("commitRecord: txid = " + transactionID);
}
- TransactionHolder tx = transactions.remove(transactionID);
+ TransactionHolder tx = loadTransactions.remove(transactionID);
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
@@ -1315,7 +1308,7 @@
// ignore this
if (tx != null)
{
- JournalTransaction journalTransaction = pendingTransactions.remove(transactionID);
+ JournalTransaction journalTransaction = transactions.remove(transactionID);
if (journalTransaction == null)
{
@@ -1365,7 +1358,7 @@
trace("rollbackRecord: txid = " + transactionID);
}
- TransactionHolder tx = transactions.remove(transactionID);
+ TransactionHolder tx = loadTransactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
// whole transaction body was reclaimed but the commit-record
@@ -1373,7 +1366,7 @@
// point
if (tx != null)
{
- JournalTransaction tnp = pendingTransactions.remove(transactionID);
+ JournalTransaction tnp = transactions.remove(transactionID);
if (tnp == null)
{
@@ -1457,13 +1450,13 @@
pushOpenedFile();
- for (TransactionHolder transaction : transactions.values())
+ for (TransactionHolder transaction : loadTransactions.values())
{
if (!transaction.prepared || transaction.invalid)
{
log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
- JournalTransaction transactionInfo = pendingTransactions.get(transaction.transactionID);
+ JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
if (transactionInfo == null)
{
@@ -1474,7 +1467,7 @@
transactionInfo.forget();
// Remove the transactionInfo
- pendingTransactions.remove(transaction.transactionID);
+ transactions.remove(transaction.transactionID);
}
else
{
@@ -1564,7 +1557,7 @@
{
fileFactory.testFlush();
- for (JournalTransaction tx : pendingTransactions.values())
+ for (JournalTransaction tx : transactions.values())
{
tx.waitCallbacks();
}
@@ -2685,7 +2678,7 @@
* @return
* @throws Exception
*/
- public JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
+ JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
{
JournalFile nextOpenedFile = null;
try
@@ -2733,13 +2726,13 @@
private JournalTransaction getTransactionInfo(final long txID)
{
- JournalTransaction tx = pendingTransactions.get(txID);
+ JournalTransaction tx = transactions.get(txID);
if (tx == null)
{
tx = new JournalTransaction(this);
- JournalTransaction trans = pendingTransactions.putIfAbsent(txID, tx);
+ JournalTransaction trans = transactions.putIfAbsent(txID, tx);
if (trans != null)
{
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -56,6 +56,8 @@
private Set<JournalFile> pendingFiles;
private TransactionCallback currentCallback;
+
+ private boolean compacting = false;
private Map<JournalFile, TransactionCallback> callbackList;
@@ -80,6 +82,8 @@
public void setCompacting()
{
+ compacting = true;
+
// / Compacting is recreating all the previous files and everything
// / so we just clear the list of previous files, previous pos and previous adds
// / The transaction may be working at the top from now
@@ -214,6 +218,7 @@
public void commit(final JournalFile file)
{
JournalCompactor compactor = journal.getCompactor();
+
if (pos != null)
{
for (Pair<JournalFile, Long> trUpdate : pos)
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -36,7 +36,7 @@
*
*
*/
-public class AIOJournalCompactTest extends JournalCompactTest
+public class AIOJournalCompactTest extends NIOJournalCompactTest
{
// Constants -----------------------------------------------------
Deleted: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -1,443 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.journal;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
-import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
-
-/**
- *
- * A JournalImplTestBase
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class JournalCompactTest extends JournalImplTestBase
-{
- private static final Logger log = Logger.getLogger(JournalCompactTest.class);
-
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
- private static final int NUMBER_OF_RECORDS = 1000;
-
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
-
- // General tests
- // =============
-
- public void testCrashRenamingFiles() throws Exception
- {
- }
-
- public void testCompactwithPendingXACommit() throws Exception
- {
- }
-
- public void testCompactwithPendingXAPrepareAndCommit() throws Exception
- {
- }
-
- public void testCompactwithPendingCommit() throws Exception
- {
- InternalCompactTest(false, false, false, false, true);
- }
-
- public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
- {
- InternalCompactTest(true, false, true, true, false);
- }
-
- public void testCompactwithConcurrentDeletes() throws Exception
- {
- InternalCompactTest(true, false, false, true, false);
- }
-
- public void testCompactwithConcurrentUpdates() throws Exception
- {
- InternalCompactTest(true, false, true, false, false);
- }
-
- public void testCompactWithConcurrentAppend() throws Exception
- {
- InternalCompactTest(true, true, false, false, false);
- }
-
- private void InternalCompactTest(final boolean regularAdd,
- final boolean performAppend,
- final boolean performUpdate,
- final boolean performDelete,
- final boolean pendingTransactions) throws Exception
- {
- setup(50, 60 * 1024, true);
-
- ArrayList<Long> liveIDs = new ArrayList<Long>();
-
- ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
-
- final CountDownLatch latchDone = new CountDownLatch(1);
- final CountDownLatch latchWait = new CountDownLatch(1);
- journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
- {
- @Override
- public void onCompactDone()
- {
- latchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- latchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
- startJournal();
- load();
-
- long transactionID = 0;
-
- if (regularAdd)
- {
-
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
- {
- add(i);
- if (i % 10 == 0 && i > 0)
- {
- journal.forceMoveNextFile();
- }
- update(i);
- }
-
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
-
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- commit(transactionID++);
- update(i);
- }
- }
-
- if (pendingTransactions)
- {
- for (long i = 0; i < 100; i++)
- {
- addTx(transactionID, idGenerator.generateID());
- updateTx(transactionID, idGenerator.generateID());
- listPendingTransactions.add(transactionID++);
- }
- }
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- if (regularAdd)
- {
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- if (!(i % 10 == 0))
- {
- delete(i);
- }
- else
- {
- liveIDs.add((long)i);
- }
- }
- }
-
- journal.forceMoveNextFile();
-
- Thread t = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- t.start();
-
- latchDone.await();
-
- int nextID = NUMBER_OF_RECORDS;
-
- if (performAppend)
- {
- for (int i = 0; i < 50; i++)
- {
- add(nextID++);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
- for (int i = 0; i < 50; i++)
- {
- // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
- addTx(transactionID, nextID++);
- commit(transactionID++);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
- }
-
- if (performUpdate)
- {
- int count = 0;
- for (Long liveID : liveIDs)
- {
- if (count++ % 2 == 0)
- {
- update(liveID);
- }
- else
- {
- // A Total new transaction (that was created after the compact started) to update a record that is being compacted
- updateTx(transactionID, liveID);
- commit(transactionID++);
- }
- }
- }
-
- if (performDelete)
- {
- int count = 0;
- for (long liveID : liveIDs)
- {
- if (count++ % 2 == 0)
- {
- delete(liveID);
- }
- else
- {
- // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
- deleteTx(transactionID, liveID);
- commit(transactionID++);
- }
-
- }
- }
-
- if (pendingTransactions)
- {
- for (long tx : listPendingTransactions)
- {
- if (tx % 2 == 0)
- {
- commit(tx);
- }
- else
- {
- rollback(tx);
- }
- }
- }
-
- /** Some independent adds and updates */
- for (int i = 0; i < 1000; i++)
- {
- long id = idGenerator.generateID();
- add(id);
- delete(id);
-
- if (i % 100 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
- journal.forceMoveNextFile();
-
- latchWait.countDown();
-
- t.join();
-
- add(idGenerator.generateID());
-
- // journal.compact();
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- public void testCompactwithConcurrentAppendAndUpdate() throws Exception
- {
- }
-
- public void testCompactWithPendingTransactionAndDelete() throws Exception
- {
- }
-
- public void testCompactingWithPendingTransaction() throws Exception
- {
-
- }
-
- public void testSimpleCompacting() throws Exception
- {
- setup(50, 60 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- int NUMBER_OF_RECORDS = 1000;
-
- // add and remove some data to force reclaiming
- {
- ArrayList<Long> ids = new ArrayList<Long>();
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- long id = idGenerator.generateID();
- ids.add(id);
- add(id);
- if (i > 0 && i % 100 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
- for (Long id : ids)
- {
- delete(id);
- }
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
- }
-
- long transactionID = 0;
-
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
- {
- add(i);
- if (i % 10 == 0 && i > 0)
- {
- journal.forceMoveNextFile();
- }
- update(i);
- }
-
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
-
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- commit(transactionID++);
- update(i);
- }
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- if (!(i % 10 == 0))
- {
- delete(i);
- }
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- System.out.println("Before compact ****************************");
- System.out.println(journal.debug());
- System.out.println("*****************************************");
-
- journal.compact();
-
- add(idGenerator.generateID());
-
- journal.compact();
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- protected int getAlignment()
- {
- return 1;
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- File file = new File(journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
- */
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return new NIOSequentialFileFactory(journalDir);
- }
-
-}
Copied: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java (from rev 7498, branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java)
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -0,0 +1,448 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ *
+ * A JournalImplTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class NIOJournalCompactTest extends JournalImplTestBase
+{
+ private static final Logger log = Logger.getLogger(NIOJournalCompactTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ private static final int NUMBER_OF_RECORDS = 1000;
+
+ IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+ // General tests
+ // =============
+
+ public void testCrashRenamingFiles() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingXACommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingCommit() throws Exception
+ {
+ InternalCompactTest(false, false, false, false, true);
+ }
+
+ public void testCompactwithPendingCommitFollowedByDelete() throws Exception
+ {
+ }
+
+
+ public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
+ {
+ InternalCompactTest(true, false, true, true, false);
+ }
+
+ public void testCompactwithConcurrentDeletes() throws Exception
+ {
+ InternalCompactTest(true, false, false, true, false);
+ }
+
+ public void testCompactwithConcurrentUpdates() throws Exception
+ {
+ InternalCompactTest(true, false, true, false, false);
+ }
+
+ public void testCompactWithConcurrentAppend() throws Exception
+ {
+ InternalCompactTest(true, true, false, false, false);
+ }
+
+ private void InternalCompactTest(final boolean regularAdd,
+ final boolean performAppend,
+ final boolean performUpdate,
+ final boolean performDelete,
+ final boolean pendingTransactions) throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ ArrayList<Long> liveIDs = new ArrayList<Long>();
+
+ ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
+ final CountDownLatch latchWait = new CountDownLatch(1);
+ journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+ @Override
+ public void onCompactDone()
+ {
+ latchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ latchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+ startJournal();
+ load();
+
+ long transactionID = 0;
+
+ if (regularAdd)
+ {
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+ }
+
+ if (pendingTransactions)
+ {
+ for (long i = 0; i < 100; i++)
+ {
+ addTx(transactionID, idGenerator.generateID());
+ updateTx(transactionID, idGenerator.generateID());
+ listPendingTransactions.add(transactionID++);
+ }
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ if (regularAdd)
+ {
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ else
+ {
+ liveIDs.add((long)i);
+ }
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t.start();
+
+ latchDone.await();
+
+ int nextID = NUMBER_OF_RECORDS;
+
+ if (performAppend)
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ add(nextID++);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ for (int i = 0; i < 50; i++)
+ {
+ // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+ addTx(transactionID, nextID++);
+ commit(transactionID++);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+ }
+
+ if (performUpdate)
+ {
+ int count = 0;
+ for (Long liveID : liveIDs)
+ {
+ if (count++ % 2 == 0)
+ {
+ update(liveID);
+ }
+ else
+ {
+ // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+ updateTx(transactionID, liveID);
+ commit(transactionID++);
+ }
+ }
+ }
+
+ if (performDelete)
+ {
+ int count = 0;
+ for (long liveID : liveIDs)
+ {
+ if (count++ % 2 == 0)
+ {
+ delete(liveID);
+ }
+ else
+ {
+ // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+ deleteTx(transactionID, liveID);
+ commit(transactionID++);
+ }
+
+ }
+ }
+
+ if (pendingTransactions)
+ {
+ for (long tx : listPendingTransactions)
+ {
+ if (tx % 2 == 0)
+ {
+ commit(tx);
+ }
+ else
+ {
+ rollback(tx);
+ }
+ }
+ }
+
+ /** Some independent adds and updates */
+ for (int i = 0; i < 1000; i++)
+ {
+ long id = idGenerator.generateID();
+ add(id);
+ delete(id);
+
+ if (i % 100 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ latchWait.countDown();
+
+ t.join();
+
+ add(idGenerator.generateID());
+
+ // journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+ {
+ }
+
+ public void testCompactWithPendingTransactionAndDelete() throws Exception
+ {
+ }
+
+ public void testCompactingWithPendingTransaction() throws Exception
+ {
+
+ }
+
+ public void testSimpleCompacting() throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ int NUMBER_OF_RECORDS = 1000;
+
+ // add and remove some data to force reclaiming
+ {
+ ArrayList<Long> ids = new ArrayList<Long>();
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ long id = idGenerator.generateID();
+ ids.add(id);
+ add(id);
+ if (i > 0 && i % 100 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ for (Long id : ids)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+ }
+
+ long transactionID = 0;
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ System.out.println("Before compact ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
+ journal.compact();
+
+ add(idGenerator.generateID());
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ protected int getAlignment()
+ {
+ return 1;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-30 01:12:59 UTC (rev 7499)
@@ -22,22 +22,11 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executor;
-import org.jboss.messaging.core.asyncio.BufferCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFile;
-import org.jboss.messaging.core.journal.impl.JournalFile;
-import org.jboss.messaging.core.journal.impl.JournalFileImpl;
import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.JournalReaderCallback;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFile;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
@@ -3099,142 +3088,6 @@
}
- public void testSimpleCompacting() throws Exception
- {
- String tmp = "file.jbm.cmp";
-
- System.out.println("index = " + tmp.lastIndexOf(".cmp"));
-
- System.out.println("new name = " + tmp.substring(0, tmp.lastIndexOf(".cmp")));
- setup(2, 60 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- int NUMBER_OF_RECORDS = 100;
-
- long transactionID = 0;
-
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
- {
- add(i);
- if (i % 10 == 0 && i > 0)
- {
- journal.forceMoveNextFile();
- }
- update(i);
- }
-
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
-
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- commit(transactionID++);
- update(i);
- }
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- if (!(i % 10 == 0))
- {
- delete(i);
- }
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- System.out.println("Before compact ****************************");
- System.out.println(journal.debug());
- System.out.println("*****************************************");
-
- journal.compact();
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- for (String fileName : fileFactory.listFiles("cmp"))
- {
- System.out.println("File = " + fileName);
-
- SequentialFile readFile = fileFactory.createSequentialFile(fileName, 1);
-
- ((JournalImpl)journal).readJournalFile(new JournalFileImpl(readFile, 13, 1), new JournalReaderCallback()
- {
- public void addRecord(RecordInfo info) throws Exception
- {
- System.out.println("AddrecordID = " + info.id);
- }
-
- public void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- System.out.println("UpdRecordTX = " + transactionID + ", recordID=" + recordInfo.id);
- }
-
- public void commitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void deleteRecord(long recordID) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void markAsDataFile(JournalFile file)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void rollbackRecord(long transactionID) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void updateRecord(RecordInfo recordInfo) throws Exception
- {
- System.out.println("UpdRecordID : " + recordInfo.id);
-
- }
-
- public void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- System.out.println("UpdRecordID : " + recordInfo.id);
-
- }
-
- });
- }
-
- }
-
protected abstract int getAlignment();
}
More information about the jboss-cvs-commits
mailing list