JBoss hornetq SVN: r9443 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-21 05:43:15 -0400 (Wed, 21 Jul 2010)
New Revision: 9443
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
Log:
test fixes
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2010-07-21 09:32:32 UTC (rev 9442)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2010-07-21 09:43:15 UTC (rev 9443)
@@ -112,6 +112,7 @@
started = true;
server.stop();
server.start();
+ sf = locator.createSessionFactory();
}
session = sf.createSession(true, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2010-07-21 09:32:32 UTC (rev 9442)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2010-07-21 09:43:15 UTC (rev 9443)
@@ -136,6 +136,7 @@
session.close();
server.stop();
server.start();
+ factory = locator.createSessionFactory();
session = factory.createSession(false, false, false);
session.start();
consumer = session.createConsumer(ADDRESS);
@@ -158,6 +159,7 @@
session.close();
server.stop();
server.start();
+ factory = locator.createSessionFactory();
session = factory.createSession(false, false, false);
session.start();
consumer = session.createConsumer(ADDRESS);
13 years, 11 months
JBoss hornetq SVN: r9442 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration: largemessage and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-21 05:32:32 -0400 (Wed, 21 Jul 2010)
New Revision: 9442
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
test fixes
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2010-07-21 07:21:57 UTC (rev 9441)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2010-07-21 09:32:32 UTC (rev 9442)
@@ -385,7 +385,8 @@
server.stop();
server.start();
-
+ //we need to recreate the locator and session factory
+ sf = locator.createSessionFactory();
jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
if (heuristicCommit)
{
@@ -493,7 +494,8 @@
server.stop();
server.start();
-
+ //we need to recreate the sf
+ sf = locator.createSessionFactory();
session = sf.createSession(true, false, false);
Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
Assert.assertEquals(0, recoveredXids.length);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-07-21 07:21:57 UTC (rev 9441)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-07-21 09:32:32 UTC (rev 9442)
@@ -2082,7 +2082,7 @@
session.close();
server.stop();
server.start();
-
+ sf = locator.createSessionFactory();
session = sf.createSession(isXA, false, false);
session.rollback(xid);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-07-21 07:21:57 UTC (rev 9441)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-07-21 09:32:32 UTC (rev 9442)
@@ -192,6 +192,7 @@
{
server.stop();
server.start();
+ sf = locator.createSessionFactory();
}
session = sf.createSession(null, null, isXA, false, false, preAck, 0);
@@ -226,6 +227,8 @@
{
server.stop();
server.start();
+ //we need to recreate sf's
+ sf = locator.createSessionFactory();
}
session = sf.createSession(null, null, isXA, false, false, preAck, 0);
13 years, 11 months
JBoss hornetq SVN: r9441 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 03:21:57 -0400 (Wed, 21 Jul 2010)
New Revision: 9441
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
Log:
Adding rollback to the combination of operations
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-21 06:38:42 UTC (rev 9440)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-21 07:21:57 UTC (rev 9441)
@@ -216,6 +216,10 @@
long addRecordStay = idGen.generateID();
long addRecord5 = idGen.generateID();
+
+ long rollbackTx = idGen.generateID();
+
+ long rollbackAdd = idGen.generateID();
add(addRecordStay);
@@ -244,15 +248,21 @@
delete(addedRecord);
add(addRecord3);
+
+ addTx(rollbackTx, rollbackAdd);
long updateTX = idGen.generateID();
updateTx(updateTX, addRecord3);
commit(updateTX);
+
+ updateTx(rollbackTx, rollbackAdd);
delete(addRecord5);
-
+
+ rollback(rollbackTx);
+
checkJournalOperation();
stopJournal();
13 years, 11 months
JBoss hornetq SVN: r9440 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 02:38:42 -0400 (Wed, 21 Jul 2010)
New Revision: 9440
Modified:
trunk/native/bin/libHornetQAIO32.so
Log:
32 bits compilation
Modified: trunk/native/bin/libHornetQAIO32.so
===================================================================
(Binary files differ)
13 years, 11 months
JBoss hornetq SVN: r9439 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 02:10:12 -0400 (Wed, 21 Jul 2010)
New Revision: 9439
Modified:
trunk/native/bin/libHornetQAIO64.so
Log:
64 bits compilation
Modified: trunk/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
13 years, 11 months
JBoss hornetq SVN: r9438 - in trunk: src/main/org/hornetq/core/asyncio and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 01:59:12 -0400 (Wed, 21 Jul 2010)
New Revision: 9438
Modified:
trunk/native/src/AsyncFile.cpp
trunk/native/src/AsyncFile.h
trunk/native/src/JNI_AsynchronousFileImpl.cpp
trunk/native/src/Version.h
trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - It's not necessary to use aio_write during compacting, we can just use a direct write. I will then just use a direct write to avoid possible issues.
Modified: trunk/native/src/AsyncFile.cpp
===================================================================
--- trunk/native/src/AsyncFile.cpp 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.cpp 2010-07-21 05:59:12 UTC (rev 9438)
@@ -206,6 +206,26 @@
free (preAllocBuffer);
}
+
+/** Write directly to the file without using libaio queue */
+void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer)
+{
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+ if (::write(fileHandle, buffer, size)<0)
+ {
+ throw AIOException (NATIVE_ERROR_IO, "Error writing file");
+ }
+
+ if (::fsync(fileHandle) < 0)
+ {
+ throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file");
+ }
+
+
+}
+
+
void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
{
Modified: trunk/native/src/AsyncFile.h
===================================================================
--- trunk/native/src/AsyncFile.h 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.h 2010-07-21 05:59:12 UTC (rev 9438)
@@ -49,6 +49,9 @@
void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
+ /** Write directly to the file without using libaio queue */
+ void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer);
+
void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
int getHandle()
Modified: trunk/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- trunk/native/src/JNI_AsynchronousFileImpl.cpp 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/JNI_AsynchronousFileImpl.cpp 2010-07-21 05:59:12 UTC (rev 9438)
@@ -196,8 +196,29 @@
}
}
+JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_writeInternal
+ (JNIEnv * env, jobject , jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer)
+{
+ try
+ {
+ AIOController * controller = getController(env, controllerAddress);
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+ return;
+ }
+ controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
JNIEXPORT void Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
(JNIEnv *env, jclass, jobject controllerAddress)
{
Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/Version.h 2010-07-21 05:59:12 UTC (rev 9438)
@@ -3,6 +3,6 @@
// This definition needs to match org.hornetq.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION
// Or else the native module won't be loaded because of version mismatches
-#define _VERSION_NATIVE_AIO 29
+#define _VERSION_NATIVE_AIO 30
#endif
Modified: trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -43,6 +43,12 @@
/** Any error will be reported on the callback interface */
void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
+
+ /**
+ * Performs an internal direct write.
+ * @throws HornetQException
+ */
+ void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
@@ -54,4 +60,5 @@
String getFileName();
+
}
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -51,7 +51,7 @@
/** This definition needs to match Version.h on the native sources.
Or else the native module won't be loaded because of version mismatches */
- private static int EXPECTED_NATIVE_VERSION = 29;
+ private static int EXPECTED_NATIVE_VERSION = 30;
/** Used to determine the next writing sequence */
private final AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -271,7 +271,18 @@
writeLock.unlock();
}
}
+
+
+ public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
+ {
+ writeInternal(handler, positionToWrite, size, bytes);
+ if (bufferCallback != null)
+ {
+ bufferCallback.bufferDone(bytes);
+ }
+ }
+
public void write(final long position,
final long size,
final ByteBuffer directByteBuffer,
@@ -629,6 +640,8 @@
ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ private native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
+
private native void read(ByteBuffer handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
@@ -720,4 +733,5 @@
}
}
}
+
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -69,6 +69,11 @@
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+
+ /** Write directly to the file.
+ * This is used by compacting and other places where we write a big buffer in a single shot.
+ * writeInternal should always block until the entire write is sync on disk */
+ void writeInternal(ByteBuffer bytes) throws Exception;
int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -279,7 +279,17 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
+
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
+ }
+
+
// Protected methods
// -----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -180,13 +180,11 @@
if (writingChannel != null)
{
sequentialFile.position(0);
- SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
- sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
- completion.waitCompletion();
+ sequentialFile.writeInternal(writingChannel.toByteBuffer());
sequentialFile.close();
newDataFiles.add(currentFile);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -260,7 +260,14 @@
{
internalWrite(bytes, sync, null);
}
+
+
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ internalWrite(bytes, true, null);
+ }
+
@Override
protected ByteBuffer newBuffer(int size, final int limit)
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -13,6 +13,10 @@
package org.hornetq.tests.unit.core.asyncio;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
@@ -917,7 +921,51 @@
}
}
+
+
+ public void testInternalWrite() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ controller.open(FILE_NAME, 2000);
+ ByteBuffer buffer = null;
+
+ try
+ {
+ final int SIZE = 10 * 512;
+
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+
+ for (int i = 0 ; i < SIZE; i++)
+ {
+ buffer.put(getSamplebyte(i));
+ }
+
+ controller.writeInternal(0, SIZE, buffer);
+
+ InputStream fileInput = new BufferedInputStream(new FileInputStream(new File(FILE_NAME)));
+
+ for (int i = 0 ; i < SIZE; i++)
+ {
+ assertEquals((int)getSamplebyte(i), (int)fileInput.read());
+ }
+
+ assertEquals(-1, fileInput.read());
+
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ finally
+ {
+ if (buffer != null) AsynchronousFileImpl.destroyBuffer(buffer);
+ if (controller != null) controller.close();
+ }
+
+ }
+
+
public void testInvalidWrite() throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -498,7 +498,17 @@
{
writeDirect(bytes, sync, null);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
+ */
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ writeDirect(bytes, true);
+ }
+
+
private void checkAndResize(final int size)
{
int oldpos = data == null ? 0 : data.position();
@@ -662,8 +672,6 @@
*/
public void setTimedBuffer(final TimedBuffer buffer)
{
- // TODO Auto-generated method stub
-
}
}
13 years, 11 months
JBoss hornetq SVN: r9437 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 01:07:44 -0400 (Wed, 21 Jul 2010)
New Revision: 9437
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
typo
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-21 02:34:27 UTC (rev 9436)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-21 05:07:44 UTC (rev 9437)
@@ -70,7 +70,7 @@
private volatile boolean started;
- // We use this flag to prevent flush occuring between calling checkSize and addBytes
+ // We use this flag to prevent flush occurring between calling checkSize and addBytes
// CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
// can get in an inconsistent state
private boolean delayFlush;
13 years, 11 months
JBoss hornetq SVN: r9436 - in trunk/tests/src/org/hornetq/tests: unit/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-20 22:34:27 -0400 (Tue, 20 Jul 2010)
New Revision: 9436
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/AIOAllPossibilitiesCompactStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Adding test that will call compact on multiple combinations between journal operations
Added: trunk/tests/src/org/hornetq/tests/stress/journal/AIOAllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AIOAllPossibilitiesCompactStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AIOAllPossibilitiesCompactStressTest.java 2010-07-21 02:34:27 UTC (rev 9436)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+
+/**
+ * A NIOAllPossibilitiesCompactStressTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AIOAllPossibilitiesCompactStressTest extends AllPossibilitiesCompactStressTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir(),
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ 1000000,
+ false);
+ }
+
+
+}
Added: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-21 02:34:27 UTC (rev 9436)
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.SimpleIDGenerator;
+import org.hornetq.utils.VariableLatch;
+
+/**
+ * A NIORandomCompactTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AllPossibilitiesCompactStressTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private VariableLatch startedCompactingLatch = null;
+
+ private VariableLatch releaseCompactingLatch = null;
+
+ private Thread tCompact = null;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ tCompact = null;
+
+ startedCompactingLatch = new VariableLatch();
+
+ releaseCompactingLatch = new VariableLatch();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ File testDir = new File(getTestDir());
+
+ File files[] = testDir.listFiles(new FilenameFilter()
+ {
+
+ public boolean accept(File dir, String name)
+ {
+ return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+ }
+ });
+
+ for (File file : files)
+ {
+ assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
+ }
+
+ super.tearDown();
+ }
+
+ int startCompactAt;
+
+ int joinCompactAt;
+
+ int secondCompactAt;
+
+ int currentOperation;
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ public void createJournal() throws Exception
+ {
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ startedCompactingLatch.down();
+ try
+ {
+ releaseCompactingLatch.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ journal.setAutoReclaim(false);
+ }
+
+
+ public void testMixOperations() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ startCompactAt = joinCompactAt = secondCompactAt = -1;
+
+ currentOperation = 0;
+ internalTest();
+ int MAX_OPERATIONS = currentOperation;
+
+ System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
+
+ for (startCompactAt = 0; startCompactAt < MAX_OPERATIONS; startCompactAt++)
+ {
+ for (joinCompactAt = startCompactAt; joinCompactAt < MAX_OPERATIONS; joinCompactAt++)
+ {
+ for (secondCompactAt = joinCompactAt; secondCompactAt < MAX_OPERATIONS; secondCompactAt++)
+ {
+ System.out.println("start=" + startCompactAt + ", join=" + joinCompactAt + ", second=" + secondCompactAt);
+
+ currentOperation = 0;
+ try
+ {
+ tearDown();
+ setUp();
+ internalTest();
+ }
+ catch (Throwable e)
+ {
+ throw new Exception("Error at compact=" + startCompactAt +
+ ", joinCompactAt=" +
+ joinCompactAt +
+ ", secondCompactAt=" +
+ secondCompactAt, e);
+ }
+ }
+ }
+ }
+ }
+
+ protected void beforeJournalOperation() throws Exception
+ {
+ checkJournalOperation();
+ }
+
+ /**
+ * @throws InterruptedException
+ * @throws Exception
+ */
+ private void checkJournalOperation() throws InterruptedException, Exception
+ {
+ if (startCompactAt == currentOperation)
+ {
+ threadCompact();
+ }
+ if (joinCompactAt == currentOperation)
+ {
+ joinCompact();
+ }
+ if (secondCompactAt == currentOperation)
+ {
+ journal.compact();
+ }
+
+ currentOperation++;
+ }
+
+ public void internalTest() throws Exception
+ {
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ long consumerTX = idGen.generateID();
+
+ long firstID = idGen.generateID();
+
+ long appendTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
+ long addRecord2 = idGen.generateID();
+
+ long addRecord3 = idGen.generateID();
+
+ long addRecord4 = idGen.generateID();
+
+ long addRecordStay = idGen.generateID();
+
+ long addRecord5 = idGen.generateID();
+
+ add(addRecordStay);
+
+ add(addRecord2);
+
+ add(addRecord4);
+
+ update(addRecord2);
+
+ addTx(consumerTX, firstID);
+
+ updateTx(consumerTX, addRecord4);
+
+ addTx(consumerTX, addRecord5);
+
+ addTx(appendTX, addedRecord);
+
+ commit(appendTX);
+
+ updateTx(consumerTX, addedRecord);
+
+ commit(consumerTX);
+
+ delete(addRecord4);
+
+ delete(addedRecord);
+
+ add(addRecord3);
+
+ long updateTX = idGen.generateID();
+
+ updateTx(updateTX, addRecord3);
+
+ commit(updateTX);
+
+ delete(addRecord5);
+
+ checkJournalOperation();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+ }
+
+ /**
+ * @param releaseCompactingLatch
+ * @param tCompact
+ * @throws InterruptedException
+ */
+ private void joinCompact() throws InterruptedException
+ {
+ releaseCompactingLatch.down();
+
+ tCompact.join();
+
+ tCompact = null;
+ }
+
+ /**
+ * @param startedCompactingLatch
+ * @return
+ * @throws InterruptedException
+ */
+ private void threadCompact() throws InterruptedException
+ {
+ tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ tCompact.start();
+
+ startedCompactingLatch.waitCompletion();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-07-20 20:39:20 UTC (rev 9435)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-07-21 02:34:27 UTC (rev 9436)
@@ -210,6 +210,10 @@
{
journal.load(null, null, null);
}
+
+ protected void beforeJournalOperation() throws Exception
+ {
+ }
protected void add(final long... arguments) throws Exception
{
@@ -222,6 +226,8 @@
{
byte[] record = generateRecord(size);
+ beforeJournalOperation();
+
journal.appendAddRecord(element, (byte)0, record, sync);
records.add(new RecordInfo(element, (byte)0, record, false));
@@ -236,6 +242,8 @@
{
byte[] updateRecord = generateRecord(recordLength);
+ beforeJournalOperation();
+
journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
records.add(new RecordInfo(element, (byte)0, updateRecord, true));
@@ -248,6 +256,8 @@
{
for (long element : arguments)
{
+ beforeJournalOperation();
+
journal.appendDeleteRecord(element, sync);
removeRecordsForID(element);
@@ -266,6 +276,8 @@
// SIZE_BYTE
byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ beforeJournalOperation();
+
journal.appendAddRecordTransactional(txID, element, (byte)0, record);
tx.records.add(new RecordInfo(element, (byte)0, record, false));
@@ -283,6 +295,8 @@
{
byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ beforeJournalOperation();
+
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
@@ -296,6 +310,8 @@
for (long element : arguments)
{
+ beforeJournalOperation();
+
journal.appendDeleteRecordTransactional(txID, element);
tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
@@ -318,6 +334,9 @@
{
throw new IllegalStateException("Transaction is already prepared");
}
+
+ beforeJournalOperation();
+
journal.appendPrepareRecord(txID, xid, sync);
tx.prepared = true;
@@ -327,17 +346,24 @@
protected void commit(final long txID) throws Exception
{
- TransactionHolder tx = transactions.get(txID);
+ TransactionHolder tx = transactions.remove(txID);
if (tx == null)
{
throw new IllegalStateException("Cannot find tx " + txID);
}
+ beforeJournalOperation();
+
journal.appendCommitRecord(txID, sync);
- commitTx(txID);
+ records.addAll(tx.records);
+ for (RecordInfo l : tx.deletes)
+ {
+ removeRecordsForID(l.id);
+ }
+
journal.debugWait();
}
@@ -350,28 +376,13 @@
throw new IllegalStateException("Cannot find tx " + txID);
}
+ beforeJournalOperation();
+
journal.appendRollbackRecord(txID, sync);
journal.debugWait();
}
- private void commitTx(final long txID)
- {
- TransactionHolder tx = transactions.remove(txID);
-
- if (tx == null)
- {
- throw new IllegalStateException("Cannot find tx " + txID);
- }
-
- records.addAll(tx.records);
-
- for (RecordInfo l : tx.deletes)
- {
- removeRecordsForID(l.id);
- }
- }
-
protected void removeRecordsForID(final long id)
{
for (ListIterator<RecordInfo> iter = records.listIterator(); iter.hasNext();)
13 years, 11 months
JBoss hornetq SVN: r9435 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-20 16:39:20 -0400 (Tue, 20 Jul 2010)
New Revision: 9435
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
Log:
adding id on exception message to improve debug
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-07-20 15:04:06 UTC (rev 9434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-07-20 20:39:20 UTC (rev 9435)
@@ -302,7 +302,7 @@
if (newRecords.get(recordID) != null)
{
// Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID + ")");
}
}
13 years, 11 months
JBoss hornetq SVN: r9434 - branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-20 11:04:06 -0400 (Tue, 20 Jul 2010)
New Revision: 9434
Modified:
branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml
Log:
removed temp configuration setting
Modified: branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml
===================================================================
13 years, 11 months