[jboss-cvs] JBoss Messaging SVN: r7432 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/asyncio/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 22 20:55:33 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-22 20:55:33 -0400 (Mon, 22 Jun 2009)
New Revision: 7432
Modified:
branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp
branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h
branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp
branches/clebert_temp_expirement/native/src/Version.h
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Uploading initial changes
Modified: branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp 2009-06-23 00:55:33 UTC (rev 7432)
@@ -22,12 +22,15 @@
#include <iostream>
#include "JavaUtilities.h"
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference) : CallbackAdapter()
+jobject nullObj = NULL;
+
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
{
controller = _controller;
callback = _callback;
fileController = _fileController;
bufferReference = _bufferReference;
+ isRead = _isRead;
}
JNICallbackAdapter::~JNICallbackAdapter()
@@ -36,7 +39,7 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, bufferReference);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, isRead ? nullObj : bufferReference);
release(threadContext);
}
@@ -44,7 +47,7 @@
{
controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, (jint)errorCode, strError);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
release(threadContext);
}
Modified: branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h
===================================================================
--- branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h 2009-06-23 00:55:33 UTC (rev 7432)
@@ -34,6 +34,8 @@
jobject callback;
jobject fileController;
jobject bufferReference;
+ // Is this a read operation
+ short isRead;
void release(THREAD_CONTEXT threadContext)
{
@@ -47,7 +49,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the constructor)
- JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference);
+ JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp 2009-06-23 00:55:33 UTC (rev 7432)
@@ -57,7 +57,7 @@
controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
@@ -103,7 +103,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
}
@@ -186,7 +186,7 @@
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
}
Modified: branches/clebert_temp_expirement/native/src/Version.h
===================================================================
--- branches/clebert_temp_expirement/native/src/Version.h 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/Version.h 2009-06-23 00:55:33 UTC (rev 7432)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 21
+#define _VERSION_NATIVE_AIO 22
#endif
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -55,7 +55,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 21;
+ private static int EXPECTED_NATIVE_VERSION = 22;
public static void addMax(final int io)
{
@@ -289,11 +289,11 @@
}
catch (MessagingException e)
{
- callbackError(aioCallback, e.getCode(), e.getMessage());
+ callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback, directByteBuffer, MessagingException.INTERNAL_ERROR, e.getMessage());
}
}
});
@@ -308,11 +308,11 @@
}
catch (MessagingException e)
{
- callbackError(aioCallback, e.getCode(), e.getMessage());
+ callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback, directByteBuffer, MessagingException.INTERNAL_ERROR, e.getMessage());
}
}
@@ -418,7 +418,9 @@
writeSemaphore.release();
pendingWrites.down();
callback.done();
- if (bufferCallback != null)
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
{
bufferCallback.bufferDone(buffer);
}
@@ -426,12 +428,18 @@
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final int errorCode, final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
pendingWrites.down();
callback.onError(errorCode, errorMessage);
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
+ {
+ bufferCallback.bufferDone(buffer);
+ }
}
private void pollEvents()
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -81,6 +81,14 @@
void appendRollbackRecord(long txID, boolean sync) throws Exception;
+
+ /**
+ * Eliminate deleted records of the journal
+ * @throws Exception
+ */
+ void compact() throws Exception;
+
+
// Load
long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -40,13 +40,11 @@
List<String> listFiles(String extension) throws Exception;
boolean isSupportsCallbacks();
-
+
ByteBuffer newBuffer(int size);
void releaseBuffer(ByteBuffer buffer);
- void controlBuffersLifeCycle(boolean value);
-
/** The factory may need to do some initialization before the file is activated.
* this was added as a hook for AIO to initialize the Observer on TimedBuffer.
* It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
@@ -71,5 +69,9 @@
* Create the directory if it doesn't exist yet
*/
void createDirs() throws Exception;
+
+ // used on tests only
+ void testFlush();
+
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -70,7 +70,7 @@
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
+
-
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -111,6 +111,11 @@
}
}
+ public void testFlush()
+ {
+ timedBuffer.flush();
+ }
+
public void deactivate(SequentialFile file)
{
timedBuffer.flush();
@@ -140,18 +145,6 @@
return AsynchronousFileImpl.isLoaded();
}
- public void controlBuffersLifeCycle(boolean value)
- {
- if (value)
- {
- buffersControl.enable();
- }
- else
- {
- buffersControl.disable();
- }
- }
-
public ByteBuffer newBuffer(int size)
{
if (size % 512 != 0)
@@ -223,23 +216,10 @@
* and ready to be reused or GCed */
private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
- /** During reload we may disable/enable buffer reuse */
- private boolean enabled = true;
-
private boolean stopped = false;
final BufferCallback callback = new LocalBufferCallback();
- public void enable()
- {
- this.enabled = true;
- }
-
- public void disable()
- {
- this.enabled = false;
- }
-
public ByteBuffer newBuffer(final int size)
{
// if a new buffer wasn't requested in 10 seconds, we clear the queue
@@ -314,26 +294,23 @@
synchronized (ReuseBuffersController.this)
{
- if (enabled)
+ if (stopped)
{
- if (stopped)
+ releaseBuffer(buffer);
+ }
+ else
+ {
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ // If a buffer has any other than the configured bufferSize, the buffer
+ // will be just sent to GC
+ if (buffer.capacity() == bufferSize)
{
- releaseBuffer(buffer);
+ reuseBuffersQueue.offer(buffer);
}
else
{
- bufferReuseLastTime = System.currentTimeMillis();
-
- // If a buffer has any other than the configured bufferSize, the buffer
- // will be just sent to GC
- if (buffer.capacity() == bufferSize)
- {
- reuseBuffersQueue.offer(buffer);
- }
- else
- {
- releaseBuffer(buffer);
- }
+ releaseBuffer(buffer);
}
}
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -53,10 +53,6 @@
}
- public void controlBuffersLifeCycle(boolean value)
- {
- }
-
public void stop()
{
}
@@ -76,6 +72,10 @@
public void deactivate(SequentialFile file)
{
}
+
+ public void testFlush()
+ {
+ }
/**
* Create the directory if it doesn't exist yet
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -44,6 +44,12 @@
void incPosCount();
void decPosCount();
+
+ void incPendingTransaction();
+
+ void decPendingTransaction();
+
+ int getPendingTransactions();
void setCanReclaim(boolean canDelete);
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -48,6 +48,8 @@
private long offset;
+ private final AtomicInteger pendingTransactions = new AtomicInteger(0);
+
private final AtomicInteger posCount = new AtomicInteger(0);
private boolean canReclaim;
@@ -104,7 +106,23 @@
{
posCount.decrementAndGet();
}
+
+ public void incPendingTransaction()
+ {
+ pendingTransactions.incrementAndGet();
+ }
+
+ public void decPendingTransaction()
+ {
+ pendingTransactions.decrementAndGet();
+ }
+
+ public int getPendingTransactions()
+ {
+ return pendingTransactions.get();
+ }
+
public void extendOffset(final int delta)
{
offset += delta;
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -47,6 +47,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -92,14 +96,16 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
+ //private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = true;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ System.out.println(message);
+ //log.trace(message);
}
// The sizes of primitive types
@@ -112,7 +118,7 @@
public static final int MIN_FILE_SIZE = 1024;
- public static final int SIZE_HEADER = 4;
+ public static final int SIZE_HEADER = SIZE_INT * 2;
public static final int BASIC_SIZE = SIZE_BYTE + SIZE_INT + SIZE_INT;
@@ -160,9 +166,9 @@
// Attributes ----------------------------------------------------
- private boolean autoReclaim = true;
+ private volatile boolean autoReclaim = true;
- private final AtomicInteger nextOrderingId = new AtomicInteger(0);
+ private final AtomicInteger nextFileID = new AtomicInteger(0);
// used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
@@ -183,16 +189,23 @@
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
- private final ConcurrentMap<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
+ // Compacting may replace this structure
+ private volatile ConcurrentMap<Long, RecordFilesRelationship> recordsRelationshipMap = new ConcurrentHashMap<Long, RecordFilesRelationship>();
private final ConcurrentMap<Long, JournalTransaction> transactionInfos = new ConcurrentHashMap<Long, JournalTransaction>();
- private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
-
private ExecutorService filesExecutor = null;
private final Semaphore lock = new Semaphore(1);
+ private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
+
+ /** We don't lock the journal while compacting, however during a short time before we start, and after we finish,
+ * we need to rearrange the referenceCounting structures*/
+ private final Lock readLockCompact = compactingLock.readLock();
+
+ private final Lock writeLockCompact = compactingLock.writeLock();
+
private volatile JournalFile currentFile;
private volatile int state;
@@ -267,32 +280,44 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int recordLength = record.getEncodeSize();
+ IOCallback callback = null;
- int size = SIZE_ADD_RECORD + recordLength;
+ readLockCompact.lock();
- ChannelBuffer bb = newBuffer(size);
+ try
+ {
- bb.writeByte(ADD_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(recordLength);
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ int recordLength = record.getEncodeSize();
- IOCallback callback = getSyncCallback(sync);
+ int size = SIZE_ADD_RECORD + recordLength;
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ ChannelBuffer bb = newBuffer(size);
- posFilesMap.put(id, new PosFiles(usedFile));
+ bb.writeByte(ADD_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(id);
+ bb.writeInt(recordLength);
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+
+ callback = getSyncCallback(sync);
+
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
+
+ recordsRelationshipMap.put(id, new RecordFilesRelationship(usedFile));
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
if (callback != null)
@@ -313,37 +338,49 @@
throw new IllegalStateException("Journal must be loaded first");
}
- PosFiles posFiles = posFilesMap.get(id);
+ IOCallback callback = null;
- if (posFiles == null)
+ readLockCompact.lock();
+
+ try
{
- throw new IllegalStateException("Cannot find add info " + id);
- }
- int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+ RecordFilesRelationship posFiles = recordsRelationshipMap.get(id);
- ChannelBuffer bb = newBuffer(size);
+ if (posFiles == null)
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
- bb.writeByte(UPDATE_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
- IOCallback callback = getSyncCallback(sync);
+ ChannelBuffer bb = newBuffer(size);
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ bb.writeByte(UPDATE_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
- posFiles.addUpdateFile(usedFile);
+ callback = getSyncCallback(sync);
+
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
+
+ posFiles.addUpdateFile(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
if (callback != null)
@@ -359,34 +396,46 @@
throw new IllegalStateException("Journal must be loaded first");
}
- PosFiles posFiles = posFilesMap.remove(id);
+ readLockCompact.lock();
- if (posFiles == null)
+ IOCallback callback = null;
+
+ try
{
- throw new IllegalStateException("Cannot find add info " + id);
- }
- int size = SIZE_DELETE_RECORD;
+ RecordFilesRelationship posFiles = recordsRelationshipMap.remove(id);
- ChannelBuffer bb = newBuffer(size);
+ if (posFiles == null)
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
- bb.writeByte(DELETE_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(size);
+ int size = SIZE_DELETE_RECORD;
- IOCallback callback = getSyncCallback(sync);
+ ChannelBuffer bb = newBuffer(size);
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ bb.writeByte(DELETE_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(id);
+ bb.writeInt(size);
- posFiles.addDelete(usedFile);
+ callback = getSyncCallback(sync);
+
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
+
+ posFiles.addDelete(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
if (callback != null)
@@ -416,33 +465,43 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int recordLength = record.getEncodeSize();
+ readLockCompact.lock();
- int size = SIZE_ADD_RECORD_TX + recordLength;
+ try
+ {
- ChannelBuffer bb = newBuffer(size);
+ int recordLength = record.getEncodeSize();
- bb.writeByte(ADD_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(recordLength);
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ int size = SIZE_ADD_RECORD_TX + recordLength;
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+ ChannelBuffer bb = newBuffer(size);
- JournalTransaction tx = getTransactionInfo(txID);
+ bb.writeByte(ADD_RECORD_TX);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(recordLength);
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
- tx.addPositive(usedFile, id);
+ lock.acquire();
+ try
+ {
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+ tx.addPositive(usedFile, id);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
}
@@ -466,31 +525,41 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+ readLockCompact.lock();
- ChannelBuffer bb = newBuffer(size);
-
- bb.writeByte(UPDATE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
-
- lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
- JournalTransaction tx = getTransactionInfo(txID);
+ int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
- tx.addPositive(usedFile, id);
+ ChannelBuffer bb = newBuffer(size);
+
+ bb.writeByte(UPDATE_RECORD_TX);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+
+ lock.acquire();
+ try
+ {
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+ tx.addPositive(usedFile, id);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
}
@@ -509,33 +578,42 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ readLockCompact.lock();
- ChannelBuffer bb = newBuffer(size);
-
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record != null ? record.getEncodeSize() : 0);
- if (record != null)
- {
- record.encode(bb);
- }
- bb.writeInt(size);
-
- lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+ int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
- JournalTransaction tx = getTransactionInfo(txID);
+ ChannelBuffer bb = newBuffer(size);
- tx.addNegative(usedFile, id);
+ bb.writeByte(DELETE_RECORD_TX);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record != null ? record.getEncodeSize() : 0);
+ if (record != null)
+ {
+ record.encode(bb);
+ }
+ bb.writeInt(size);
+
+ lock.acquire();
+ try
+ {
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+ tx.addNegative(usedFile, id);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
}
@@ -546,30 +624,40 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_DELETE_RECORD_TX;
+ readLockCompact.lock();
- ChannelBuffer bb = newBuffer(size);
-
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(0);
- bb.writeInt(size);
-
- lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+ int size = SIZE_DELETE_RECORD_TX;
- JournalTransaction tx = getTransactionInfo(txID);
+ ChannelBuffer bb = newBuffer(size);
- tx.addNegative(usedFile, id);
+ bb.writeByte(DELETE_RECORD_TX);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(0);
+ bb.writeInt(size);
+
+ lock.acquire();
+ try
+ {
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+ tx.addNegative(usedFile, id);
+ }
+ finally
+ {
+ lock.release();
+ }
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
+
}
/**
@@ -592,22 +680,34 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = getTransactionInfo(txID);
+ readLockCompact.lock();
- ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+ IOCallback callback = null;
- IOCallback callback = getTransactionCallback(txID, sync);
-
- lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalTransaction tx = getTransactionInfo(txID);
- tx.prepare(usedFile);
+ ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+
+ callback = getTransactionCallback(tx, sync);
+
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
+
+ tx.prepare(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
+
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
// We should wait this outside of the lock, to increase throughput
@@ -615,6 +715,7 @@
{
callback.waitCompletion();
}
+
}
/**
@@ -641,29 +742,40 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.remove(txID);
+ readLockCompact.lock();
- if (tx == null)
+ IOCallback callback = null;
+
+ try
{
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
- ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+ JournalTransaction tx = transactionInfos.remove(txID);
- IOCallback callback = getTransactionCallback(txID, sync);
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
- transactionCallbacks.remove(txID);
+ callback = getTransactionCallback(tx, sync);
- tx.commit(usedFile);
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
+
+ tx.commit(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
+
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -671,7 +783,6 @@
{
callback.waitCompletion();
}
-
}
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
@@ -681,36 +792,46 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.remove(txID);
+ IOCallback callback = null;
- if (tx == null)
+ readLockCompact.lock();
+
+ try
{
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
+ JournalTransaction tx = transactionInfos.remove(txID);
- int size = SIZE_ROLLBACK_RECORD;
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
- ChannelBuffer bb = newBuffer(size);
+ int size = SIZE_ROLLBACK_RECORD;
- bb.writeByte(ROLLBACK_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeInt(size);
+ ChannelBuffer bb = newBuffer(size);
- IOCallback callback = getTransactionCallback(txID, sync);
+ bb.writeByte(ROLLBACK_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeInt(size);
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ callback = getTransactionCallback(tx, sync);
- transactionCallbacks.remove(txID);
+ lock.acquire();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, sync, callback);
- tx.rollback(usedFile);
+ tx.rollback(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
+
}
finally
{
- lock.release();
+ readLockCompact.unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -764,6 +885,59 @@
return maxID;
}
+ public void compact() throws Exception
+ {
+ ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = null;
+
+ ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+
+ boolean previousReclaimValue = autoReclaim;
+
+ try
+ {
+
+ // First, we replace the recordsRelationshipMap by a new one.
+ // We need to guarantee that the journal is frozen for this short time
+ // We don't freeze the journal as we compact, only for the short time where we replace recordsRelationshipMap
+ writeLockCompact.lock();
+ try
+ {
+ autoReclaim = false;
+
+ recordsSnapshot = recordsRelationshipMap;
+
+ recordsRelationshipMap = new ConcurrentHashMap<Long, RecordFilesRelationship>();
+
+ for (JournalFile file: dataFiles)
+ {
+ if (file.getPendingTransactions() == 0)
+ {
+ trace("Adding " + file + " to compact list");
+ dataFilesToProcess.add(file);
+ }
+ else
+ {
+ trace(file + " will not be compacted as it has pending transactions");
+ break;
+ }
+ }
+
+ }
+ finally
+ {
+ writeLockCompact.unlock();
+ }
+
+ }
+ finally
+ {
+ autoReclaim = previousReclaimValue;
+ }
+
+
+
+ }
+
private boolean isInvalidSize(int bufferPos, int size)
{
if (size < 0)
@@ -820,8 +994,6 @@
throw new IllegalStateException("Journal must be in started state");
}
- fileFactory.controlBuffersLifeCycle(false);
-
final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
@@ -849,7 +1021,7 @@
loadManager.addRecord(info);
- posFilesMap.put(info.id, new PosFiles(file));
+ recordsRelationshipMap.put(info.id, new RecordFilesRelationship(file));
}
public void updateRecord(RecordInfo info) throws Exception
@@ -862,7 +1034,7 @@
loadManager.updateRecord(info);
- PosFiles posFiles = posFilesMap.get(info.id);
+ RecordFilesRelationship posFiles = recordsRelationshipMap.get(info.id);
if (posFiles != null)
{
@@ -884,7 +1056,7 @@
loadManager.deleteRecord(recordID);
- PosFiles posFiles = posFilesMap.remove(recordID);
+ RecordFilesRelationship posFiles = recordsRelationshipMap.remove(recordID);
if (posFiles != null)
{
@@ -1123,8 +1295,6 @@
}
}
- fileFactory.controlBuffersLifeCycle(true);
-
// Create any more files we need
// FIXME - size() involves a scan
@@ -1135,7 +1305,7 @@
for (int i = 0; i < filesToCreate; i++)
{
// Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false));
+ freeFiles.add(createFile(false, false));
}
}
@@ -1163,7 +1333,7 @@
{
currentFile = freeFiles.remove();
- openFile(currentFile);
+ openFile(currentFile, true);
}
fileFactory.activate(currentFile.getFile());
@@ -1275,9 +1445,14 @@
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
{
- for (TransactionCallback callback : transactionCallbacks.values())
+ fileFactory.testFlush();
+
+ for (JournalTransaction tx : transactionInfos.values())
{
- callback.waitCompletion();
+ if (tx.getCallback() != null)
+ {
+ tx.getCallback().waitCompletion();
+ }
}
if (filesExecutor != null && !filesExecutor.isShutdown())
@@ -1352,7 +1527,7 @@
public int getIDMapSize()
{
- return posFilesMap.size();
+ return recordsRelationshipMap.size();
}
public int getFileSize()
@@ -1486,7 +1661,7 @@
// Discard the old JournalFile and set it with a new ID
private JournalFile reinitializeFile(final JournalFile file) throws Exception
{
- int newOrderingID = generateOrderingID();
+ int newFileID = generateFileID();
SequentialFile sf = file.getFile();
@@ -1494,15 +1669,16 @@
sf.position(0);
- ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
- bb.putInt(newOrderingID);
+ bb.putInt(newFileID);
+ bb.putInt(newFileID);
bb.rewind();
sf.write(bb, true);
- JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID);
sf.position(bb.limit());
@@ -1511,7 +1687,6 @@
return jf;
}
-
private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
{
ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
@@ -1729,13 +1904,13 @@
reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
break;
}
-
+
case UPDATE_RECORD:
{
reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
break;
}
-
+
case DELETE_RECORD:
{
reader.deleteRecord(recordID);
@@ -1747,19 +1922,19 @@
reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
break;
}
-
+
case UPDATE_RECORD_TX:
{
reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
break;
}
-
+
case DELETE_RECORD_TX:
{
reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
break;
}
-
+
case PREPARE_RECORD:
{
@@ -1821,7 +1996,6 @@
}
-
/** It will read the elements-summary back from the commit/prepare transaction
* Pair<FileID, Counter> */
@SuppressWarnings("unchecked")
@@ -2041,7 +2215,7 @@
file.open(1);
- ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
file.read(bb);
@@ -2051,9 +2225,9 @@
bb = null;
- if (nextOrderingId.get() < orderingID)
+ if (nextFileID.get() < orderingID)
{
- nextOrderingId.set(orderingID);
+ nextFileID.set(orderingID);
}
orderedFiles.add(new JournalFileImpl(file, orderingID));
@@ -2150,11 +2324,11 @@
* @return
* @throws Exception
*/
- private JournalFile createFile(final boolean keepOpened) throws Exception
+ private JournalFile createFile(final boolean keepOpened, final boolean multiAIO) throws Exception
{
- int orderingID = generateOrderingID();
+ int fileID = generateFileID();
- String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+ String fileName = filePrefix + "-" + fileID + "." + fileExtension;
if (trace)
{
@@ -2163,19 +2337,27 @@
SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO);
- sequentialFile.open();
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1);
+ }
sequentialFile.fill(0, fileSize, FILL_CHARACTER);
- ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
- bb.putInt(orderingID);
+ bb.putInt(fileID);
+ bb.putInt(fileID);
bb.rewind();
sequentialFile.write(bb, true);
- JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+ JournalFile info = new JournalFileImpl(sequentialFile, fileID);
if (!keepOpened)
{
@@ -2185,16 +2367,23 @@
return info;
}
- private void openFile(final JournalFile file) throws Exception
+ private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
{
- file.getFile().open();
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1);
+ }
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
}
- private int generateOrderingID()
+ private int generateFileID()
{
- return nextOrderingId.incrementAndGet();
+ return nextFileID.incrementAndGet();
}
// You need to guarantee lock.acquire() before calling this method
@@ -2272,6 +2461,17 @@
* */
private void pushOpenedFile() throws Exception
{
+ JournalFile nextOpenedFile = openFile(true);
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JournalFile openFile(boolean multiAIO) throws Exception
+ {
JournalFile nextOpenedFile = null;
try
{
@@ -2283,14 +2483,13 @@
if (nextOpenedFile == null)
{
- nextOpenedFile = createFile(true);
+ nextOpenedFile = createFile(true, multiAIO);
}
else
{
- openFile(nextOpenedFile);
+ openFile(nextOpenedFile, multiAIO);
}
-
- openedFiles.offer(nextOpenedFile);
+ return nextOpenedFile;
}
private void closeFile(final JournalFile file)
@@ -2351,22 +2550,17 @@
}
}
- private IOCallback getTransactionCallback(final long transactionId, final boolean sync) throws MessagingException
+ private IOCallback getTransactionCallback(final JournalTransaction tx, final boolean sync) throws MessagingException
{
if (sync && fileFactory.isSupportsCallbacks())
{
- TransactionCallback callback = transactionCallbacks.get(transactionId);
+ TransactionCallback callback = tx.getCallback();
if (callback == null)
{
callback = new TransactionCallback();
- TransactionCallback callbackCheck = transactionCallbacks.putIfAbsent(transactionId, callback);
-
- if (callbackCheck != null)
- {
- callback = callbackCheck;
- }
+ tx.setCallback(callback);
}
if (callback.errorMessage != null)
@@ -2433,14 +2627,18 @@
}
- /** Used on the ref-count for reclaiming */
- private static class PosFiles
+ /**
+ * This holds the relationship a record has with other files in regard to reference counting.
+ * Note: This class used to be called PosFiles
+ *
+ * Used on the ref-count for reclaiming */
+ private static class RecordFilesRelationship
{
private final JournalFile addFile;
private List<JournalFile> updateFiles;
- PosFiles(final JournalFile addFile)
+ RecordFilesRelationship(final JournalFile addFile)
{
this.addFile = addFile;
@@ -2475,7 +2673,7 @@
public String toString()
{
StringBuffer buffer = new StringBuffer();
- buffer.append("PosFiles(add=" + addFile.getFile().getFileName());
+ buffer.append("RecordFilesRelationship(add=" + addFile.getFile().getFileName());
if (updateFiles != null)
{
@@ -2495,11 +2693,15 @@
private class JournalTransaction
{
+ private TransactionCallback callback;
+
private List<Pair<JournalFile, Long>> pos;
private List<Pair<JournalFile, Long>> neg;
- private Set<JournalFile> transactionPos;
+ // All the files this transaction is touching on.
+ // We can't have those files being reclaimed or compacted if there is a pending transaction
+ private Set<JournalFile> pendingFiles;
// Map of file id to number of elements participating on the transaction
// in that file
@@ -2511,11 +2713,27 @@
return numberOfElementsPerFile;
}
+ /**
+ * @param callback
+ */
+ public void setCallback(TransactionCallback callback)
+ {
+ this.callback = callback;
+ }
+
+ /**
+ * @return
+ */
+ public TransactionCallback getCallback()
+ {
+ return this.callback;
+ }
+
public void addPositive(final JournalFile file, final long id)
{
getCounter(file).incrementAndGet();
- addTXPosCount(file);
+ addFile(file);
if (pos == null)
{
@@ -2529,7 +2747,7 @@
{
getCounter(file).incrementAndGet();
- addTXPosCount(file);
+ addFile(file);
if (neg == null)
{
@@ -2539,19 +2757,22 @@
neg.add(new Pair<JournalFile, Long>(file, id));
}
+ /**
+ * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+ * */
public void commit(final JournalFile file)
{
if (pos != null)
{
for (Pair<JournalFile, Long> p : pos)
{
- PosFiles posFiles = posFilesMap.get(p.b);
+ RecordFilesRelationship posFiles = recordsRelationshipMap.get(p.b);
if (posFiles == null)
{
- posFiles = new PosFiles(p.a);
+ posFiles = new RecordFilesRelationship(p.a);
- posFilesMap.put(p.b, posFiles);
+ recordsRelationshipMap.put(p.b, posFiles);
}
else
{
@@ -2564,7 +2785,7 @@
{
for (Pair<JournalFile, Long> n : neg)
{
- PosFiles posFiles = posFilesMap.remove(n.b);
+ RecordFilesRelationship posFiles = recordsRelationshipMap.remove(n.b);
if (posFiles != null)
{
@@ -2576,12 +2797,17 @@
// Now add negs for the pos we added in each file in which there were
// transactional operations
- for (JournalFile jf : transactionPos)
+ for (JournalFile jf : pendingFiles)
{
file.incNegCount(jf);
+ jf.decPendingTransaction();
}
}
+ /**
+ * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * or else potFilesMap could be affected
+ * */
public void rollback(final JournalFile file)
{
// Now add negs for the pos we added in each file in which there were
@@ -2594,45 +2820,54 @@
// just left with a prepare when the tx
// has actually been rolled back
- for (JournalFile jf : transactionPos)
+ for (JournalFile jf : pendingFiles)
{
file.incNegCount(jf);
+ jf.decPendingTransaction();
}
}
+ /**
+ * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * or else potFilesMap could be affected
+ * */
public void prepare(final JournalFile file)
{
// We don't want the prepare record getting deleted before time
- addTXPosCount(file);
+ addFile(file);
}
+ /** Used by load, when the transaction was not loaded correctly */
public void forget()
{
// The transaction was not committed or rolled back in the file, so we
// reverse any pos counts we added
-
- for (JournalFile jf : transactionPos)
+ for (JournalFile jf : pendingFiles)
{
jf.decPosCount();
+ jf.decPendingTransaction();
}
+
}
- private void addTXPosCount(final JournalFile file)
+ private void addFile(final JournalFile file)
{
- if (transactionPos == null)
+ if (pendingFiles == null)
{
- transactionPos = new HashSet<JournalFile>();
+ pendingFiles = new HashSet<JournalFile>();
}
- if (!transactionPos.contains(file))
+ if (!pendingFiles.contains(file))
{
- transactionPos.add(file);
+ pendingFiles.add(file);
// We add a pos for the transaction itself in the file - this
// prevents any transactional operations
// being deleted before a commit or rollback is written
file.incPosCount();
+
+ file.incPendingTransaction();
}
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -70,7 +70,12 @@
file.mkdir();
- return new AIOSequentialFileFactory(getTestDir());
+ return new AIOSequentialFileFactory(getTestDir(),
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ 1000000,
+ true,
+ false
+ );
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -256,7 +256,7 @@
removeRecordsForID(element);
}
-
+
journal.debugWait();
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
+import java.util.ArrayList;
import java.util.List;
import org.jboss.messaging.core.journal.EncodingSupport;
@@ -2962,7 +2963,6 @@
{
addTx(transactionID, i);
- updateTx(i + 100);
if (i % 10 == 0 && i > 0)
{
journal.forceMoveNextFile();
@@ -3035,6 +3035,63 @@
assertEquals(0, journal.getDataFilesCount());
}
+ public void testCompactingWithPendingTransaction() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ long transactionID = 0;
+
+ for (int i = 0; i < 500; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = 500; i < 1000; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i + 100);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ System.out.println("Before compact ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
+
+ journal.compact();
+
+ }
+
protected abstract int getAlignment();
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -863,5 +863,27 @@
{
return transactionIDs;
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#decPendingTransaction()
+ */
+ public void decPendingTransaction()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getPendingTransactions()
+ */
+ public int getPendingTransactions()
+ {
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#incPendingTransaction()
+ */
+ public void incPendingTransaction()
+ {
+ }
}
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-23 00:55:33 UTC (rev 7432)
@@ -680,4 +680,11 @@
{
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#testFlush()
+ */
+ public void testFlush()
+ {
+ }
+
}
More information about the jboss-cvs-commits
mailing list