JBoss hornetq SVN: r8381 - in branches/ClebertCallback: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 14:59:38 -0500 (Mon, 23 Nov 2009)
New Revision: 8381
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
Log:
small tweak
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23 18:34:17 UTC (rev 8380)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23 19:59:38 UTC (rev 8381)
@@ -155,6 +155,8 @@
log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
}
}
+
+ maxIOSemaphore = null;
notifyAll();
}
@@ -252,6 +254,19 @@
private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
{
+ if (!isOpen())
+ {
+ if (callback != null)
+ {
+ callback.onError(HornetQException.IO_ERROR, "File not opened");
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "File not opened");
+ }
+ return;
+ }
+
if (writerExecutor == null)
{
doInternalWrite(bytes, sync, callback);
@@ -271,7 +286,7 @@
{
doInternalWrite(bytes, sync, callback);
}
- catch (Exception e)
+ catch (Throwable e)
{
log.warn("Exception on submitting write", e);
callback.onError(HornetQException.IO_ERROR, e.getMessage());
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-23 18:34:17 UTC (rev 8380)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-23 19:59:38 UTC (rev 8381)
@@ -19,9 +19,11 @@
import java.util.UUID;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -224,19 +226,19 @@
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bb1), true);
long bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb2, true);
+ sf.write(wrapBuffer(bb2), true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb3, true);
+ sf.write(wrapBuffer(bb3), true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
@@ -296,20 +298,20 @@
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bb1), true);
long bytesWritten = sf.position() - initialPos;
assertEquals(bb1.limit(), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb2, true);
+ sf.write(wrapBuffer(bb2), true);
bytesWritten = sf.position() - initialPos;
assertEquals(bb2.limit(), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb3, true);
+ sf.write(wrapBuffer(bb3), true);
bytesWritten = sf.position() - initialPos;
assertEquals(bb3.limit(), bytesWritten);
@@ -373,7 +375,7 @@
ByteBuffer bb1 = factory.wrapBuffer(bytes1);
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bb1), true);
long bytesWritten = sf.position() - initialPos;
assertEquals(bb1.limit(), bytesWritten);
@@ -385,23 +387,27 @@
bb1 = factory.wrapBuffer(bytes1);
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bb1), true);
fail("Should throw exception");
}
catch (Exception e)
{
- // OK
}
sf.open();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bb1), true);
sf.close();
}
// Private ---------------------------------
+
+ private HornetQBuffer wrapBuffer(ByteBuffer buffer)
+ {
+ return ChannelBuffers.wrappedBuffer(buffer);
+ }
protected void checkFill(final SequentialFile file, final int pos, final int size, final byte fillChar) throws Exception
{
15 years, 1 month
JBoss hornetq SVN: r8380 - branches/ClebertCallback/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 13:34:17 -0500 (Mon, 23 Nov 2009)
New Revision: 8380
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
tweaks
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23 18:15:23 UTC (rev 8379)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23 18:34:17 UTC (rev 8380)
@@ -1078,7 +1078,6 @@
public void handleXARollback(final SessionXARollbackMessage packet)
{
- System.out.println("XARollback");
Packet response = null;
Xid xid = packet.getXid();
15 years, 1 month
JBoss hornetq SVN: r8379 - branches/ClebertCallback/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 13:15:23 -0500 (Mon, 23 Nov 2009)
New Revision: 8379
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweaks
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-23 17:46:20 UTC (rev 8378)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-23 18:15:23 UTC (rev 8379)
@@ -876,11 +876,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
try
@@ -891,6 +886,11 @@
writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
@@ -958,17 +958,17 @@
}
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
ChannelBuffer bb = newBuffer(size);
writeUpdateRecord(-1, id, recordType, record, size, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
@@ -1021,11 +1021,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
try
@@ -1040,13 +1035,18 @@
throw new IllegalStateException("Cannot find add info " + id);
}
}
-
+
int size = SIZE_DELETE_RECORD;
ChannelBuffer bb = newBuffer(size);
writeDeleteRecord(-1, id, size, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
@@ -1282,11 +1282,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
JournalTransaction tx = getTransactionInfo(txID);
@@ -1299,6 +1294,11 @@
writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
@@ -1359,11 +1359,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
JournalTransaction tx = transactions.remove(txID);
@@ -1378,7 +1373,7 @@
callback.done();
return;
}
-
+
ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
writeTransaction(-1,
@@ -1389,6 +1384,11 @@
-1 /* number of records on this transaction will be filled later inside append record */,
bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
@@ -1429,11 +1429,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
JournalTransaction tx = null;
@@ -1446,11 +1441,16 @@
{
throw new IllegalStateException("Cannot find tx with id " + txID);
}
-
+
ChannelBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
writeRollback(-1, txID, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
15 years, 1 month
JBoss hornetq SVN: r8378 - branches/ClebertCallback/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 12:46:20 -0500 (Mon, 23 Nov 2009)
New Revision: 8378
Modified:
branches/ClebertCallback/native/bin/libHornetQAIO64.so
Log:
64 bits compilation for a tweak
Modified: branches/ClebertCallback/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
15 years, 1 month
JBoss hornetq SVN: r8377 - in branches/ClebertCallback: src/main/org/hornetq/core/asyncio/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 12:44:05 -0500 (Mon, 23 Nov 2009)
New Revision: 8377
Modified:
branches/ClebertCallback/native/src/AIOException.h
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
Log:
NIO should be totally Asynchronous now
Modified: branches/ClebertCallback/native/src/AIOException.h
===================================================================
--- branches/ClebertCallback/native/src/AIOException.h 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/native/src/AIOException.h 2009-11-23 17:44:05 UTC (rev 8377)
@@ -29,7 +29,7 @@
#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
#define NATIVE_ERROR_PREALLOCATE_FILE 208
#define NATIVE_ERROR_ALLOCATE_MEMORY 209
-#define NATIVE_ERROR_IO 210
+#define NATIVE_ERROR_IO 006
#define NATIVE_ERROR_AIO_FULL 211
Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -49,7 +49,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 26;
+ private static int EXPECTED_NATIVE_VERSION = 27;
/** Used to determine the next writing sequence */
private AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -154,7 +154,7 @@
private final VariableLatch pendingWrites = new VariableLatch();
- private Semaphore writeSemaphore;
+ private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
@@ -195,7 +195,7 @@
}
this.maxIO = maxIO;
- writeSemaphore = new Semaphore(this.maxIO);
+ maxIOSemaphore = new Semaphore(this.maxIO);
this.fileName = fileName;
@@ -245,12 +245,12 @@
log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
}
- while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
}
- writeSemaphore = null;
+ maxIOSemaphore = null;
if (poller != null)
{
stopPoller();
@@ -294,7 +294,7 @@
{
public void run()
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
long sequence = nextWritingSequence.getAndIncrement();
@@ -319,7 +319,7 @@
}
else
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
long sequence = nextWritingSequence.getAndIncrement();
@@ -350,7 +350,7 @@
startPoller();
}
pendingWrites.up();
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
try
{
read(handler, position, size, directByteBuffer, aioPackage);
@@ -358,14 +358,14 @@
catch (HornetQException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
@@ -444,7 +444,7 @@
@SuppressWarnings("unused")
private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
@@ -511,7 +511,7 @@
{
log.warn("CallbackError: " + errorMessage);
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
Modified: branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -38,6 +38,9 @@
public static final int UNBLOCKED = 005;
+ public static final int IO_ERROR = 006;
+
+
public static final int QUEUE_DOES_NOT_EXIST = 100;
public static final int QUEUE_EXISTS = 101;
@@ -85,8 +88,6 @@
public static final int NATIVE_ERROR_ALLOCATE_MEMORY = 209;
- public static final int NATIVE_ERROR_IO = 210;
-
public static final int NATIVE_ERROR_AIO_FULL = 211;
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -38,7 +38,7 @@
boolean exists();
/**
- * For certain operations (like loading) we don't need open the file with full maxIO
+ * The maximum number of simultaneous writes accepted
* @param maxIO
* @throws Exception
*/
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -15,9 +15,7 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
@@ -49,9 +47,6 @@
/** The pool for Thread pollers */
private final Executor pollerExecutor;
- /** Context switch on AIO could fire unnecessary flushes, so we use a single thread for write */
- private final Executor writerExecutor;
-
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -63,9 +58,8 @@
final Executor writerExecutor,
final Executor pollerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
this.maxIO = maxIO;
- this.writerExecutor = writerExecutor;
this.bufferCallback = bufferCallback;
this.pollerExecutor = pollerExecutor;
}
@@ -102,27 +96,13 @@
{
return;
}
+
+ super.close();
+
opened = false;
timedBuffer = null;
- final CountDownLatch donelatch = new CountDownLatch(1);
-
- writerExecutor.execute(new Runnable()
- {
- public void run()
- {
- donelatch.countDown();
- }
- });
-
- while (!donelatch.await(60, TimeUnit.SECONDS))
- {
- log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
- new Exception("Warning: Executor on file " + getFile().getName() +
- " couldn't complete its tasks in 60 seconds."));
- }
-
aioFile.close();
aioFile = null;
@@ -193,13 +173,18 @@
open(maxIO);
}
- public synchronized void open(final int currentMaxIO) throws Exception
+ public synchronized void open(final int maxIO) throws Exception
{
opened = true;
+
aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
- aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
+
+ aioFile.open(getFile().getAbsolutePath(), maxIO);
+
position.set(0);
+
aioFile.setBufferCallback(bufferCallback);
+
this.fileSize = aioFile.size();
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -42,11 +42,6 @@
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
- /** A single AIO write executor for every AIO File.
- * This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
- * And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
- private ExecutorService writeExecutor;
-
private ExecutorService pollerExecutor;
// This method exists just to make debug easier.
@@ -149,9 +144,6 @@
{
super.start();
- writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
- true));
-
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
true));
@@ -162,19 +154,6 @@
{
buffersControl.stop();
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
-
pollerExecutor.shutdown();
try
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -44,13 +44,6 @@
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
- /**
- *
- * We can't execute callbacks directly from any of the IO module. We need to do it through another thread,
- * So, we will use an executor for this.
- * */
- protected ExecutorService callbacksExecutor;
-
protected final String journalDir;
protected final TimedBuffer timedBuffer;
@@ -58,7 +51,16 @@
protected final int bufferSize;
protected final long bufferTimeout;
+
+ /**
+ * Asynchronous writes need to be done at another executor.
+ * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
+ * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
+ * */
+ protected ExecutorService writeExecutor;
+
+
public AbstractSequentialFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
@@ -86,13 +88,13 @@
timedBuffer.stop();
}
- if (callbacksExecutor != null)
+ if (isSupportsCallbacks())
{
- callbacksExecutor.shutdown();
-
+ writeExecutor.shutdown();
+
try
{
- if (!callbacksExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
{
log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
}
@@ -101,6 +103,8 @@
{
}
}
+
+
}
public void start()
@@ -109,17 +113,14 @@
{
timedBuffer.start();
}
-
+
if (isSupportsCallbacks())
{
- callbacksExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-callbacks" + System.identityHashCode(this),
+ writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
true));
}
- else
- {
- callbacksExecutor = null;
- }
+
}
/* (non-Javadoc)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -16,7 +16,9 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOAsyncTask;
@@ -43,7 +45,7 @@
private File file;
private final String directory;
-
+
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -56,6 +58,9 @@
* This is the class returned to the factory when the file is being activated. */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+ /** Used for asynchronous writes */
+ protected final Executor writerExecutor;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -64,12 +69,16 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory,
+ final File file,
+ final SequentialFileFactory factory,
+ final Executor writerExecutor)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
+ this.writerExecutor = writerExecutor;
}
// Public --------------------------------------------------------
@@ -116,6 +125,29 @@
}
}
+ public synchronized void close() throws Exception
+ {
+ final CountDownLatch donelatch = new CountDownLatch(1);
+
+ if (writerExecutor != null)
+ {
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ donelatch.countDown();
+ }
+ });
+
+ while (!donelatch.await(60, TimeUnit.SECONDS))
+ {
+ log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
+ new Exception("Warning: Executor on file " + getFile().getName() +
+ " couldn't complete its tasks in 60 seconds."));
+ }
+ }
+ }
+
public final boolean fits(final int size)
{
if (timedBuffer == null)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -19,7 +19,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,14 +44,23 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
+ /** The write semaphore here is only used when writing asynchronously */
+ private Semaphore maxIOSemaphore;
+
+ private final int defaultMaxIO;
+
+ private int maxIO;
+
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName, final int maxIO, final Executor writerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
+ this.defaultMaxIO = maxIO;
}
- public NIOSequentialFile(final SequentialFileFactory factory, final File file)
+ public NIOSequentialFile(final SequentialFileFactory factory, final File file, final int maxIO, final Executor writerExecutor)
{
- super(file.getParent(), new File(file.getPath()), factory);
+ super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
+ this.defaultMaxIO = maxIO;
}
public int getAlignment()
@@ -66,20 +78,28 @@
return channel != null;
}
+ /** this.maxIO represents the default maxIO.
+ * Some operations while initializing files on the journal may require a different maxIO */
public synchronized void open() throws Exception
{
+ open(this.defaultMaxIO);
+ }
+
+ public void open(final int maxIO) throws Exception
+ {
rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
fileSize = channel.size();
+
+ if (writerExecutor != null)
+ {
+ this.maxIOSemaphore = new Semaphore(maxIO);
+ this.maxIO = maxIO;
+ }
}
- public void open(final int currentMaxIO) throws Exception
- {
- open();
- }
-
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -112,6 +132,8 @@
public synchronized void close() throws Exception
{
+ super.close();
+
if (channel != null)
{
channel.close();
@@ -126,6 +148,14 @@
rfile = null;
+ if (maxIOSemaphore != null)
+ {
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ {
+ log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
+ }
+ }
+
notifyAll();
}
@@ -153,7 +183,7 @@
{
if (callback != null)
{
- callback.onError(-1, e.getLocalizedMessage());
+ callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
}
throw e;
@@ -195,7 +225,7 @@
public SequentialFile copy()
{
- return new NIOSequentialFile(factory, getFile());
+ return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor);
}
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
@@ -220,6 +250,42 @@
internalWrite(bytes, sync, null);
}
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+ {
+ if (writerExecutor == null)
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ else
+ {
+ // This is a flow control on writing, just like maxAIO on libaio
+ maxIOSemaphore.acquire();
+
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ try
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ catch (Exception e)
+ {
+ log.warn("Exception on submitting write", e);
+ callback.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ }
+ finally
+ {
+ maxIOSemaphore.release();
+ }
+ }
+ });
+ }
+ }
+
/**
* @param bytes
* @param sync
@@ -227,8 +293,9 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+ private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
{
+
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-23 17:44:05 UTC (rev 8377)
@@ -64,9 +64,15 @@
}
// maxIO is ignored on NIO
- public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
- return new NIOSequentialFile(this, journalDir, fileName);
+ if (maxIO < 0)
+ {
+ // A single threaded IO
+ maxIO = 1;
+ }
+
+ return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
public boolean isSupportsCallbacks()
15 years, 1 month
JBoss hornetq SVN: r8376 - branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 09:51:41 -0500 (Mon, 23 Nov 2009)
New Revision: 8376
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
Log:
Replacting invalid test by a new one
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-11-23 05:45:35 UTC (rev 8375)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-11-23 14:51:41 UTC (rev 8376)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.journal.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -57,6 +56,16 @@
public void testAsynchronousCommit() throws Exception
{
+ doAsynchronousTest(true);
+ }
+
+ public void testAsynchronousRollback() throws Exception
+ {
+ doAsynchronousTest(false);
+ }
+
+ public void doAsynchronousTest(final boolean isCommit) throws Exception
+ {
final int JOURNAL_SIZE = 20000;
setupJournal(JOURNAL_SIZE, 100, 5);
@@ -81,7 +90,14 @@
latch.countDown();
factory.setHoldCallbacks(false, null);
- journalImpl.appendCommitRecord(1l, true);
+ if (isCommit)
+ {
+ journalImpl.appendCommitRecord(1l, true);
+ }
+ else
+ {
+ journalImpl.appendRollbackRecord(1l, true);
+ }
}
catch (Exception e)
{
@@ -97,6 +113,8 @@
assertTrue(latch.await(5, TimeUnit.SECONDS));
Thread.yield();
+
+ Thread.sleep(100);
assertTrue(t.isAlive());
@@ -109,145 +127,7 @@
throw t.e;
}
}
-
- public void testAsynchronousRollbackWithError() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 100, 5);
-
- final CountDownLatch latch = new CountDownLatch(11);
-
- factory.setHoldCallbacks(true, new FakeSequentialFileFactory.ListenerHoldCallback()
- {
-
- public void callbackAdded(final ByteBuffer bytes)
- {
- latch.countDown();
- }
- });
-
- class LocalThread extends Thread
- {
- Exception e;
-
- @Override
- public void run()
- {
- try
- {
- for (int i = 0; i < 10; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
- }
-
- journalImpl.appendRollbackRecord(1l, true);
- }
- catch (Exception e)
- {
- this.e = e;
- }
- }
- };
-
- LocalThread t = new LocalThread();
- t.start();
-
- latch.await();
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.setCallbackAsError(0);
-
- factory.flushCallback(0);
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.flushAllCallbacks();
-
- t.join();
-
- assertNotNull(t.e);
- }
-
- public void testAsynchronousCommitWithError() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 100, 5);
-
- final CountDownLatch latch = new CountDownLatch(11);
-
- factory.setHoldCallbacks(true, new FakeSequentialFileFactory.ListenerHoldCallback()
- {
-
- public void callbackAdded(final ByteBuffer bytes)
- {
- latch.countDown();
- }
- });
-
- class LocalThread extends Thread
- {
- Exception e;
-
- @Override
- public void run()
- {
- try
- {
- for (int i = 0; i < 10; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
- }
-
- journalImpl.appendCommitRecord(1l, true);
- }
- catch (Exception e)
- {
- this.e = e;
- }
- }
- };
-
- LocalThread t = new LocalThread();
- t.start();
-
- latch.await();
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.setCallbackAsError(0);
-
- factory.flushCallback(0);
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.flushAllCallbacks();
-
- t.join();
-
- assertNotNull(t.e);
-
- try
- {
- journalImpl.appendRollbackRecord(1l, false);
- fail("Supposed to throw an exception");
- }
- catch (Exception e)
- {
-
- }
- }
-
+
// If a callback error already arrived, we should just throw the exception
// right away
public void testPreviousError() throws Exception
15 years, 1 month
JBoss hornetq SVN: r8375 - in branches/ClebertCallback: tests/src/org/hornetq/tests/integration/replication and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 00:45:35 -0500 (Mon, 23 Nov 2009)
New Revision: 8375
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
treating exceptions on the OperationContext
Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-23 05:45:35 UTC (rev 8375)
@@ -111,6 +111,12 @@
/** You may have several actions to be done after a replication operation is completed. */
public void executeOnCompletion(final IOAsyncTask completion)
{
+ if (errorCode != -1)
+ {
+ completion.onError(errorCode, errorMessage);
+ return;
+ }
+
boolean executeNow = false;
synchronized (this)
@@ -175,10 +181,8 @@
while (iter.hasNext())
{
TaskHolder holder = iter.next();
- if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
+ if (stored >= holder.storeLined && replicated >= holder.replicationLined)
{
- holder.executed = true;
-
if (executor != null)
{
// If set, we use an executor to avoid the server being single threaded
@@ -224,15 +228,6 @@
*/
public void complete()
{
- // TODO: test and fix exceptions on the Context
- if (tasks != null && errorMessage != null)
- {
- for (TaskHolder run : tasks)
- {
- run.task.onError(errorCode, errorMessage);
- }
- }
-
// We hold errors until the complete is set, or the callbacks will never get informed
errorCode = -1;
errorMessage = null;
@@ -241,10 +236,21 @@
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
*/
- public void onError(int errorCode, String errorMessage)
+ public synchronized void onError(int errorCode, String errorMessage)
{
this.errorCode = errorCode;
this.errorMessage = errorMessage;
+
+ if (tasks != null)
+ {
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
+ {
+ TaskHolder holder = iter.next();
+ holder.task.onError(errorCode, errorMessage);
+ iter.remove();
+ }
+ }
}
class TaskHolder
@@ -253,8 +259,6 @@
int replicationLined;
- boolean executed;
-
IOAsyncTask task;
TaskHolder(IOAsyncTask task)
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23 04:57:31 UTC (rev 8374)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23 05:45:35 UTC (rev 8375)
@@ -26,6 +26,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientSessionFactory;
@@ -54,6 +55,7 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -412,7 +414,96 @@
server.stop();
}
}
+
+ public void testExceptionSettingActionBefore() throws Exception
+ {
+ OperationContext ctx = OperationContextImpl.getContext(factory);
+
+ ctx.lineUp();
+
+ String msg = "I'm an exception";
+
+ ctx.onError(5, msg);
+
+ final AtomicInteger lastError = new AtomicInteger(0);
+
+ final List<String> msgsResult = new ArrayList<String>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals(5, lastError.get());
+
+ assertEquals(1, msgsResult.size());
+
+ assertEquals(msg, msgsResult.get(0));
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ // Adding the Task after the exception should still throw an exception
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch2.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ assertEquals(2, msgsResult.size());
+ assertEquals(msg, msgsResult.get(0));
+
+ assertEquals(msg, msgsResult.get(1));
+
+ // Clearing any exception from the Context, so we can use the context again
+ ctx.complete();
+
+
+ final CountDownLatch latch3 = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch3.countDown();
+ }
+ });
+
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+
+
+
+ }
+
/**
* @return
*/
15 years, 1 month
JBoss hornetq SVN: r8374 - in branches/ClebertCallback/src/main/org/hornetq/core: management/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-22 23:57:31 -0500 (Sun, 22 Nov 2009)
New Revision: 8374
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
Log:
Clean up debug logs
Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
@@ -467,7 +467,6 @@
}
else
{
- // System.out.println("Buffering callback");
pendingCallbacks.add(new CallbackHolder(sequence, callback));
}
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
+++ branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
@@ -747,15 +747,9 @@
if (storageManager != null)
{
- System.out.println("Waiting on management...");
storageManager.waitOnOperations(managementRequestTimeout);
storageManager.clearContext();
- System.out.println("Done");
}
- else
- {
- new Exception("storagemanager is null, can't wait on operations").printStackTrace();
- }
}
public void enableNotifications(boolean enabled)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
@@ -422,8 +422,6 @@
for (final CloseListener listener : listenersClone)
{
- System.out.println("Calling listener -> " + listener);
- System.out.println("Calling listener " + listener.getClass().getName());
try
{
listener.connectionClosed();
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
@@ -766,7 +766,6 @@
public void handleRollback(final RollbackMessage packet)
{
- new Exception("Rollback").printStackTrace();
Packet response = null;
try
Modified: branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
+++ branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-23 04:57:31 UTC (rev 8374)
@@ -162,7 +162,6 @@
{
if (operations != null)
{
- System.out.println("Prepare was executed fine");
for (TransactionOperation operation : operations)
{
try
@@ -213,7 +212,6 @@
// Why do we need a prepare record on the onePhase optimization?
// Why we can't just go straight to commit, if we are doing one phase anyway?
state = State.PREPARED;
-// System.out.println("Adding Prepare");
// prepare();
}
}
@@ -240,7 +238,6 @@
if (containsPersistent || (xid != null && state == State.PREPARED))
{
- System.out.println("Adding commit");
storageManager.commit(id);
state = State.COMMITTED;
@@ -259,7 +256,6 @@
public void done()
{
- System.out.println("Commit was executed fine");
if (operations != null)
{
for (TransactionOperation operation : operations)
@@ -328,7 +324,6 @@
{
if (operations != null)
{
- System.out.println("Rollback was executed fine");
for (TransactionOperation operation : operations)
{
try
15 years, 1 month
JBoss hornetq SVN: r8373 - branches/ClebertCallback/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-22 23:43:08 -0500 (Sun, 22 Nov 2009)
New Revision: 8373
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-23 03:38:18 UTC (rev 8372)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-23 04:43:08 UTC (rev 8373)
@@ -943,11 +943,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
- if (callback != null)
- {
- callback.lineUp();
- }
-
compactingLock.readLock().lock();
try
@@ -963,6 +958,11 @@
}
}
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
ChannelBuffer bb = newBuffer(size);
15 years, 1 month
JBoss hornetq SVN: r8372 - branches/ClebertCallback/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-22 22:38:18 -0500 (Sun, 22 Nov 2009)
New Revision: 8372
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
Tweaks
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-23 03:30:52 UTC (rev 8371)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-23 03:38:18 UTC (rev 8372)
@@ -113,10 +113,7 @@
{
byte type = packet.getType();
- if (sessionContext != null)
- {
- storageManager.setContext(sessionContext);
- }
+ storageManager.setContext(sessionContext);
try
{
@@ -308,10 +305,8 @@
}
finally
{
- if (sessionContext != null)
- {
- storageManager.completeOperations();
- }
+ storageManager.completeOperations();
+ storageManager.clearContext();
}
}
}
15 years, 1 month