[jboss-cvs] JBoss Messaging SVN: r7485 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jun 27 23:52:47 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-27 23:52:47 -0400 (Sat, 27 Jun 2009)
New Revision: 7485
Added:
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Removed:
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
concurrent updates
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-28 03:00:49 UTC (rev 7484)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-28 03:52:47 UTC (rev 7485)
@@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -343,7 +344,10 @@
if (posFiles == null)
{
- throw new IllegalStateException("Cannot find add info " + id);
+ if (!(compactor != null && compactor.lookupRecord(id)))
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
}
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
@@ -359,7 +363,14 @@
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
- posFiles.addUpdateFile(usedFile);
+ if (posFiles == null)
+ {
+ compactor.addPendingUpdate(id, usedFile);
+ }
+ else
+ {
+ posFiles.addUpdateFile(usedFile);
+ }
}
finally
{
@@ -798,7 +809,6 @@
return maxID;
}
-
// Note: This method can't be called from the executor, as it will invoke other methods depending on it.
public void compact() throws Exception
{
@@ -838,7 +848,7 @@
records = new ConcurrentHashMap<Long, JournalRecord>();
dataFilesToProcess.addAll(dataFiles);
-
+
dataFiles.clear();
this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
@@ -848,7 +858,7 @@
{
compactingLock.writeLock().unlock();
}
-
+
// Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
JournalFile previousFile = null;
for (final JournalFile file : dataFilesToProcess)
@@ -875,29 +885,34 @@
onCompactDone();
SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
-
-
+
List<JournalFile> newDatafiles = null;
compactingLock.writeLock().lock();
try
{
- for ( Map.Entry<Long, JournalRecord> newRecordEntry: compactor.newRecords.entrySet())
+ newDatafiles = compactor.newDataFiles;
+
+ for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
{
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
-
- for (JournalFile data: compactor.newDataFiles)
+
+ for (int i = newDatafiles.size() - 1; i >= 0; i--)
{
- dataFiles.addFirst(data);
+ dataFiles.addFirst(newDatafiles.get(i));
}
-
+
+ for (Pair<Long, JournalFile> pendingRecord : compactor.pendingUpdates)
+ {
+ JournalRecord updateRecord = this.records.get(pendingRecord.a);
+ updateRecord.addUpdateFile(pendingRecord.b);
+ }
+
// Restore relationshipMap
// Deal with transactions commits that happend during the compacting
// Deal with updates and deletes that happened during the compacting
- newDatafiles = compactor.newDataFiles;
-
this.compactor = null;
}
@@ -1014,6 +1029,8 @@
final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+ final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+
public Compactor(Map<Long, JournalRecord> recordsSnapshot,
Map<Long, JournalTransaction> pendingTransactions,
int firstFileID)
@@ -1023,6 +1040,20 @@
this.pendingTransactions = pendingTransactions;
}
+ /**
+ * @param id
+ * @param usedFile
+ */
+ public void addPendingUpdate(long id, JournalFile usedFile)
+ {
+ pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+ }
+
+ public boolean lookupRecord(long id)
+ {
+ return recordsSnapshot.get(id) != null;
+ }
+
private void checkSize(int size) throws Exception
{
if (channelWrapper == null)
Copied: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java (from rev 7484, branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java)
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-28 03:52:47 UTC (rev 7485)
@@ -0,0 +1,358 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ *
+ * A JournalImplTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class JournalCompactTest extends JournalImplTestBase
+{
+ private static final Logger log = Logger.getLogger(JournalCompactTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ private static final int NUMBER_OF_RECORDS = 1000;
+
+ IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+ // General tests
+ // =============
+
+ public void testCompactwithPendingXACommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingCommit() throws Exception
+ {
+ }
+
+ public void testCompactwithConcurrentDeletes() throws Exception
+ {
+ }
+
+ public void testCompactwithConcurrentUpdates() throws Exception
+ {
+ InternalCompactTest(false, true);
+ }
+
+ public void testCompactWithConcurrentAppend() throws Exception
+ {
+ InternalCompactTest(true, false);
+ }
+
+ private void InternalCompactTest(boolean performAppend, boolean performUpdate) throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ ArrayList<Long> liveIDs = new ArrayList<Long>();
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
+ final CountDownLatch latchWait = new CountDownLatch(1);
+ journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+ public void onCompactDone()
+ {
+ latchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ latchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+ startJournal();
+ load();
+
+ long transactionID = 0;
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ else
+ {
+ liveIDs.add((long)i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t.start();
+
+ latchDone.await();
+
+ int nextID = NUMBER_OF_RECORDS;
+
+ if (performAppend)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ add(nextID++);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+ }
+
+ if (performUpdate)
+ {
+ for (Long liveID : liveIDs)
+ {
+ update(liveID);
+ }
+ }
+
+ /** Some independent adds and updates */
+ for (int i = 0; i < 1000; i++)
+ {
+ long id = idGenerator.generateID();
+ add(id);
+ delete(id);
+
+ if (i % 100 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ latchWait.countDown();
+
+ t.join();
+
+ delete(0);
+ add(idGenerator.generateID());
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+ {
+ }
+
+ public void testCompactWithPendingTransactionAndDelete() throws Exception
+ {
+ }
+
+ public void testCompactingWithPendingTransaction() throws Exception
+ {
+
+ }
+
+ public void testSimpleCompacting() throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ int NUMBER_OF_RECORDS = 1000;
+
+ // add and remove some data to force reclaiming
+ {
+ ArrayList<Long> ids = new ArrayList<Long>();
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ long id = idGenerator.generateID();
+ ids.add(id);
+ add(id);
+ if (i > 0 && (i % 100 == 0))
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ for (Long id : ids)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+ }
+
+ long transactionID = 0;
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ System.out.println("Before compact ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
+ journal.compact();
+
+ add(idGenerator.generateID());
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ protected int getAlignment()
+ {
+ return 1;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected SequentialFileFactory createFactory()
+ {
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return createFactory();
+ }
+
+}
Property changes on: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
Deleted: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java 2009-06-28 03:00:49 UTC (rev 7484)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java 2009-06-28 03:52:47 UTC (rev 7485)
@@ -1,326 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.journal.impl;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
-
-/**
- *
- * A JournalImplTestBase
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class JournalCompactTest extends JournalImplTestBase
-{
- private static final Logger log = Logger.getLogger(JournalCompactTest.class);
-
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
- private static final int NUMBER_OF_RECORDS = 1000;
-
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
-
- // General tests
- // =============
-
- public void testCompactwithPendingXACommit() throws Exception
- {
- }
-
- public void testCompactwithPendingXAPrepareAndCommit() throws Exception
- {
- }
-
- public void testCompactwithPendingCommit() throws Exception
- {
- }
-
- public void testCompactwithConcurrentDeletes() throws Exception
- {
- }
-
- public void testCompactWithConcurrentAppend() throws Exception
- {
- setup(50, 60 * 1024, true);
-
- final CountDownLatch latchDone = new CountDownLatch(1);
- final CountDownLatch latchWait = new CountDownLatch(1);
- journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
- {
- public void onCompactDone()
- {
- latchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- latchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
- startJournal();
- load();
-
- long transactionID = 0;
-
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
- {
- add(i);
- if (i % 10 == 0 && i > 0)
- {
- journal.forceMoveNextFile();
- }
- update(i);
- }
-
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
-
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- commit(transactionID++);
- update(i);
- }
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- if (!(i % 10 == 0))
- {
- delete(i);
- }
- }
-
- journal.forceMoveNextFile();
-
- Thread t = new Thread()
- {
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- t.start();
-
- latchDone.await();
-
- int nextID = NUMBER_OF_RECORDS;
-
- for (int i = 0; i < 100; i++)
- {
- add(nextID++);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
- latchWait.countDown();
-
- t.join();
-
-
- for (int i = 0 ; i < 1000; i++)
- {
- long id = idGenerator.generateID();
- add(id);
- delete(id);
-
- if (i % 100 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
- journal.forceMoveNextFile();
-
- delete(0);
- add(idGenerator.generateID());
-
- journal.compact();
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- public void testCompactwithConcurrentAppendAndUpdate() throws Exception
- {
- }
-
- public void testCompactWithPendingTransactionAndDelete() throws Exception
- {
- }
-
- public void testCompactingWithPendingTransaction() throws Exception
- {
-
- }
-
- public void testSimpleCompacting() throws Exception
- {
- setup(50, 60 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- int NUMBER_OF_RECORDS = 1000;
-
- // add and remove some data to force reclaiming
- {
- ArrayList<Long> ids = new ArrayList<Long>();
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- long id = idGenerator.generateID();
- ids.add(id);
- add(id);
- if (i > 0 && (i % 100 == 0))
- {
- journal.forceMoveNextFile();
- }
- }
-
- for (Long id : ids)
- {
- delete(id);
- }
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
- }
-
- long transactionID = 0;
-
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
- {
- add(i);
- if (i % 10 == 0 && i > 0)
- {
- journal.forceMoveNextFile();
- }
- update(i);
- }
-
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
-
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
- }
- commit(transactionID++);
- update(i);
- }
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- if (!(i % 10 == 0))
- {
- delete(i);
- }
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
- System.out.println("Before compact ****************************");
- System.out.println(journal.debug());
- System.out.println("*****************************************");
-
- journal.compact();
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- protected int getAlignment()
- {
- return 1;
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- File file = new File(journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
- }
-
- protected SequentialFileFactory createFactory()
- {
- return new NIOSequentialFileFactory(journalDir);
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
- */
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return createFactory();
- }
-
-}
More information about the jboss-cvs-commits
mailing list