JBoss hornetq SVN: r9493 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 14:36:17 -0400 (Fri, 30 Jul 2010)
New Revision: 9493
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
typo
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-07-30 18:12:47 UTC (rev 9492)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-07-30 18:36:17 UTC (rev 9493)
@@ -415,6 +415,7 @@
for (int i = 0; running & i < ids.length; i++)
{
System.out.println("Deleting");
+ maxRecords.release();
journal.appendDeleteRecord(ids[i], false);
numberOfDeletes.incrementAndGet();
}
14 years, 7 months
JBoss hornetq SVN: r9492 - trunk/tests/src/org/hornetq/tests/soak/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 14:12:47 -0400 (Fri, 30 Jul 2010)
New Revision: 9492
Modified:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Log:
typo
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-07-30 18:12:11 UTC (rev 9491)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-07-30 18:12:47 UTC (rev 9492)
@@ -39,7 +39,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.SECONDS.toMillis(1);
+ return TimeUnit.HOURS.toMillis(2);
}
14 years, 7 months
JBoss hornetq SVN: r9491 - in trunk/tests/src/org/hornetq/tests: stress/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 14:12:11 -0400 (Fri, 30 Jul 2010)
New Revision: 9491
Added:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
avoid OutOfMemoryException on soak test for very long runs (maximizing number of pending records with a semaphore)
Added: trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-07-30 18:12:11 UTC (rev 9491)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.journal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.tests.stress.journal.JournalCleanupCompactStressTest;
+
+/**
+ * A JournalCleanupCompactSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCleanupCompactSoakTest extends JournalCleanupCompactStressTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected long getTotalTimeMilliseconds()
+ {
+ return TimeUnit.SECONDS.toMillis(1);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-07-30 17:20:25 UTC (rev 9490)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-07-30 18:12:11 UTC (rev 9491)
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +52,9 @@
{
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ // We want to maximize the difference between appends and deles, or we could get out of memory
+ public Semaphore maxRecords;
private volatile boolean running;
@@ -77,6 +81,8 @@
{
super.setUp();
+ maxRecords = new Semaphore(20000);
+
errors.set(0);
File dir = new File(getTemporaryDir());
@@ -235,6 +241,11 @@
LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastAppenderTx()
+ {
+ super("FastAppenderTX");
+ }
@Override
public void run()
@@ -255,7 +266,7 @@
long id = JournalCleanupCompactStressTest.idGen.generateID();
ids[i] = id;
journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
+ maxRecords.acquire();
}
journal.appendCommitRecord(txID, true, ctx);
ctx.executeOnCompletion(new IOAsyncTask()
@@ -293,6 +304,7 @@
public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
{
+ super("FastUpdateTX");
this.queue = queue;
}
@@ -350,6 +362,7 @@
for (long id : ids)
{
journal.appendDeleteRecord(id, false);
+ maxRecords.release();
numberOfDeletes.incrementAndGet();
}
}
@@ -373,6 +386,12 @@
*/
class SlowAppenderNoTX extends Thread
{
+
+ public SlowAppenderNoTX()
+ {
+ super("SlowAppender");
+ }
+
@Override
public void run()
{
@@ -386,6 +405,7 @@
{
System.out.println("append slow");
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
+ maxRecords.acquire();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
numberOfRecords.incrementAndGet();
14 years, 7 months
JBoss hornetq SVN: r9490 - in trunk/tests/src/org/hornetq/tests: stress/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 13:20:25 -0400 (Fri, 30 Jul 2010)
New Revision: 9490
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Removed:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
Log:
Move soak test as stress test
Deleted: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 15:01:16 UTC (rev 9489)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 17:20:25 UTC (rev 9490)
@@ -1,424 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.soak.journal;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.OrderedExecutorFactory;
-import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
-
-/**
- * A SoakJournal
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class JournalSoakTest extends ServiceTestBase
-{
-
- public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
-
- private volatile boolean running;
-
- private AtomicInteger errors = new AtomicInteger(0);
-
- private AtomicInteger numberOfRecords = new AtomicInteger(0);
-
- private AtomicInteger numberOfUpdates = new AtomicInteger(0);
-
- private AtomicInteger numberOfDeletes = new AtomicInteger(0);
-
- private JournalImpl journal;
-
- ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
- false,
- JournalSoakTest.class.getClassLoader());
-
- private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
-
- OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- errors.set(0);
-
- File dir = new File(getTemporaryDir());
- dir.mkdirs();
-
- SequentialFileFactory factory;
-
- int maxAIO;
- if (AsynchronousFileImpl.isLoaded())
- {
- factory = new AIOSequentialFileFactory(dir.getPath());
- maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
- }
- else
- {
- factory = new NIOSequentialFileFactory(dir.getPath());
- maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
- }
-
- journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
- 10,
- 15,
- ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
- factory,
- "hornetq-data",
- "hq",
- maxAIO);
-
- journal.start();
- journal.loadInternalOnly();
-
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (journal.isStarted())
- {
- journal.stop();
- }
- }
- catch (Exception e)
- {
- // don't care :-)
- }
- }
-
- public void testAppend() throws Exception
- {
-
- running = true;
- SlowAppenderNoTX t1 = new SlowAppenderNoTX();
-
- int NTHREADS = 5;
-
- FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
- FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
-
- for (int i = 0; i < NTHREADS; i++)
- {
- appenders[i] = new FastAppenderTx();
- updaters[i] = new FastUpdateTx(appenders[i].queue);
- }
-
- t1.start();
-
- Thread.sleep(1000);
-
- for (int i = 0; i < NTHREADS; i++)
- {
- appenders[i].start();
- updaters[i].start();
- }
-
- long timeToEnd = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10);
-
- while (System.currentTimeMillis() < timeToEnd)
- {
- System.out.println("Append = " + numberOfRecords +
- ", Update = " +
- numberOfUpdates +
- ", Delete = " +
- numberOfDeletes +
- ", liveRecords = " +
- (numberOfRecords.get() - numberOfDeletes.get()));
- Thread.sleep(TimeUnit.SECONDS.toMillis(10));
- }
-
- running = false;
-
- for (Thread t : appenders)
- {
- t.join();
- }
-
- for (Thread t : updaters)
- {
- t.join();
- }
-
- t1.join();
-
- assertEquals(0, errors.get());
-
- journal.stop();
-
- journal.start();
-
- ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
- ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
- journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
- {
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
- {
- }
- });
-
- long appends = 0, updates = 0;
-
- for (RecordInfo record : committedRecords)
- {
- if (record.isUpdate)
- {
- updates++;
- }
- else
- {
- appends++;
- }
- }
-
- assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
-
- journal.stop();
- }
-
- private byte[] generateRecord()
- {
- int size = RandomUtil.randomPositiveInt() % 10000;
- if (size == 0)
- {
- size = 10000;
- }
- return RandomUtil.randomBytes(size);
- }
-
- class FastAppenderTx extends Thread
- {
- LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
-
- OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
- @Override
- public void run()
- {
- try
- {
-
- while (running)
- {
- final int txSize = RandomUtil.randomMax(100);
-
- long txID = JournalSoakTest.idGen.generateID();
-
- final long ids[] = new long[txSize];
-
- for (int i = 0; i < txSize; i++)
- {
- long id = JournalSoakTest.idGen.generateID();
- ids[i] = id;
- journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
- }
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new IOAsyncTask()
- {
-
- public void onError(final int errorCode, final String errorMessage)
- {
- }
-
- public void done()
- {
- numberOfRecords.addAndGet(txSize);
- for (Long id : ids)
- {
- queue.add(id);
- }
- }
- });
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- running = false;
- errors.incrementAndGet();
- }
- }
- }
-
- class FastUpdateTx extends Thread
- {
- final LinkedBlockingDeque<Long> queue;
-
- OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
- public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
- {
- this.queue = queue;
- }
-
- @Override
- public void run()
- {
- try
- {
- int txSize = RandomUtil.randomMax(100);
- int txCount = 0;
- long ids[] = new long[txSize];
-
- long txID = JournalSoakTest.idGen.generateID();
-
- while (running)
- {
-
- long id = queue.poll(60, TimeUnit.MINUTES);
- ids[txCount] = id;
- journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- if (++txCount == txSize)
- {
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new DeleteTask(ids));
- txCount = 0;
- txSize = RandomUtil.randomMax(100);
- txID = JournalSoakTest.idGen.generateID();
- ids = new long[txSize];
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- running = false;
- errors.incrementAndGet();
- }
- }
- }
-
- class DeleteTask implements IOAsyncTask
- {
- final long ids[];
-
- DeleteTask(final long ids[])
- {
- this.ids = ids;
- }
-
- public void done()
- {
- numberOfUpdates.addAndGet(ids.length);
- try
- {
- for (long id : ids)
- {
- journal.appendDeleteRecord(id, false);
- numberOfDeletes.incrementAndGet();
- }
- }
- catch (Exception e)
- {
- System.err.println("Can't delete id");
- e.printStackTrace();
- running = false;
- errors.incrementAndGet();
- }
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- }
-
- }
-
- /** Adds stuff to the journal, but it will take a long time to remove them.
- * This will cause cleanup and compacting to happen more often
- */
- class SlowAppenderNoTX extends Thread
- {
- @Override
- public void run()
- {
- try
- {
- while (running)
- {
- long ids[] = new long[5];
- // Append
- for (int i = 0; running & i < ids.length; i++)
- {
- System.out.println("append slow");
- ids[i] = JournalSoakTest.idGen.generateID();
- journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
- numberOfRecords.incrementAndGet();
-
- Thread.sleep(TimeUnit.SECONDS.toMillis(50));
- }
- // Delete
- for (int i = 0; running & i < ids.length; i++)
- {
- System.out.println("Deleting");
- journal.appendDeleteRecord(ids[i], false);
- numberOfDeletes.incrementAndGet();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- }
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java (from rev 9488, trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-07-30 17:20:25 UTC (rev 9490)
@@ -0,0 +1,429 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
+import org.hornetq.utils.concurrent.LinkedBlockingDeque;
+
+/**
+ * A SoakJournal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCleanupCompactStressTest extends ServiceTestBase
+{
+
+ public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private volatile boolean running;
+
+ private AtomicInteger errors = new AtomicInteger(0);
+
+ private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+ private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+ private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
+ private JournalImpl journal;
+
+ ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
+ false,
+ JournalCleanupCompactStressTest.class.getClassLoader());
+
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+
+ OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ errors.set(0);
+
+ File dir = new File(getTemporaryDir());
+ dir.mkdirs();
+
+ SequentialFileFactory factory;
+
+ int maxAIO;
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ factory = new AIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
+ }
+ else
+ {
+ factory = new NIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
+ }
+
+ journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ 10,
+ 15,
+ ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
+ factory,
+ "hornetq-data",
+ "hq",
+ maxAIO);
+
+ journal.start();
+ journal.loadInternalOnly();
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (journal.isStarted())
+ {
+ journal.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ // don't care :-)
+ }
+ }
+
+ protected long getTotalTimeMilliseconds()
+ {
+ return TimeUnit.MINUTES.toMillis(10);
+ }
+
+ public void testAppend() throws Exception
+ {
+
+ running = true;
+ SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+ int NTHREADS = 5;
+
+ FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+ FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i] = new FastAppenderTx();
+ updaters[i] = new FastUpdateTx(appenders[i].queue);
+ }
+
+ t1.start();
+
+ Thread.sleep(1000);
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i].start();
+ updaters[i].start();
+ }
+
+ long timeToEnd = System.currentTimeMillis() + getTotalTimeMilliseconds();
+
+ while (System.currentTimeMillis() < timeToEnd)
+ {
+ System.out.println("Append = " + numberOfRecords +
+ ", Update = " +
+ numberOfUpdates +
+ ", Delete = " +
+ numberOfDeletes +
+ ", liveRecords = " +
+ (numberOfRecords.get() - numberOfDeletes.get()));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ }
+
+ running = false;
+
+ for (Thread t : appenders)
+ {
+ t.join();
+ }
+
+ for (Thread t : updaters)
+ {
+ t.join();
+ }
+
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ journal.stop();
+
+ journal.start();
+
+ ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+ journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ long appends = 0, updates = 0;
+
+ for (RecordInfo record : committedRecords)
+ {
+ if (record.isUpdate)
+ {
+ updates++;
+ }
+ else
+ {
+ appends++;
+ }
+ }
+
+ assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+
+ journal.stop();
+ }
+
+ private byte[] generateRecord()
+ {
+ int size = RandomUtil.randomPositiveInt() % 10000;
+ if (size == 0)
+ {
+ size = 10000;
+ }
+ return RandomUtil.randomBytes(size);
+ }
+
+ class FastAppenderTx extends Thread
+ {
+ LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ while (running)
+ {
+ final int txSize = RandomUtil.randomMax(100);
+
+ long txID = JournalCleanupCompactStressTest.idGen.generateID();
+
+ final long ids[] = new long[txSize];
+
+ for (int i = 0; i < txSize; i++)
+ {
+ long id = JournalCleanupCompactStressTest.idGen.generateID();
+ ids[i] = id;
+ journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
+ Thread.sleep(1);
+ }
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ numberOfRecords.addAndGet(txSize);
+ for (Long id : ids)
+ {
+ queue.add(id);
+ }
+ }
+ });
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ }
+ }
+
+ class FastUpdateTx extends Thread
+ {
+ final LinkedBlockingDeque<Long> queue;
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
+ {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ int txSize = RandomUtil.randomMax(100);
+ int txCount = 0;
+ long ids[] = new long[txSize];
+
+ long txID = JournalCleanupCompactStressTest.idGen.generateID();
+
+ while (running)
+ {
+
+ long id = queue.poll(60, TimeUnit.MINUTES);
+ ids[txCount] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ if (++txCount == txSize)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ txCount = 0;
+ txSize = RandomUtil.randomMax(100);
+ txID = JournalCleanupCompactStressTest.idGen.generateID();
+ ids = new long[txSize];
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ }
+ }
+
+ class DeleteTask implements IOAsyncTask
+ {
+ final long ids[];
+
+ DeleteTask(final long ids[])
+ {
+ this.ids = ids;
+ }
+
+ public void done()
+ {
+ numberOfUpdates.addAndGet(ids.length);
+ try
+ {
+ for (long id : ids)
+ {
+ journal.appendDeleteRecord(id, false);
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println("Can't delete id");
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ }
+
+ /** Adds stuff to the journal, but it will take a long time to remove them.
+ * This will cause cleanup and compacting to happen more often
+ */
+ class SlowAppenderNoTX extends Thread
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ long ids[] = new long[5];
+ // Append
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("append slow");
+ ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
+ journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+ numberOfRecords.incrementAndGet();
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+ }
+ // Delete
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("Deleting");
+ journal.appendDeleteRecord(ids[i], false);
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
14 years, 7 months
JBoss hornetq SVN: r9489 - trunk/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 11:01:16 -0400 (Fri, 30 Jul 2010)
New Revision: 9489
Modified:
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
tweak
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-30 15:00:26 UTC (rev 9488)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-30 15:01:16 UTC (rev 9489)
@@ -105,10 +105,8 @@
journal.checkReclaimStatus();
System.out.println("Data = " + journal.debug());
-
+
journal.stop();
-
- journal.stop();
out.close();
}
14 years, 7 months
JBoss hornetq SVN: r9488 - in trunk: src/main/org/hornetq/core/journal/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-30 11:00:26 -0400 (Fri, 30 Jul 2010)
New Revision: 9488
Modified:
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 Changes after stress tests, making sure about the integrity of the journal
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -54,6 +54,11 @@
boolean isAutoReclaim();
void compact() throws Exception;
+
+ void cleanUp(final JournalFile file) throws Exception;
+
+ JournalFile getCurrentFile();
+
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -176,20 +176,6 @@
}
}
- /**
- * Read files that depend on this file.
- * Commits and rollbacks are also counted as negatives. We need to fix those also.
- * @param dependencies
- */
- public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies) throws Exception
- {
- for (JournalFile dependency : dependencies)
- {
- fixDependency(originalFile, dependency);
- }
-
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -220,32 +206,7 @@
}
// Private -------------------------------------------------------
- private void fixDependency(final JournalFile originalFile, final JournalFile dependency) throws Exception
- {
- JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
- {
- @Override
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- if (transactionCounter.containsKey(transactionID))
- {
- dependency.incNegCount(originalFile);
- }
- }
- @Override
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- if (transactionCounter.containsKey(transactionID))
- {
- dependency.incNegCount(originalFile);
- }
- }
- };
-
- JournalImpl.readJournalFile(fileFactory, dependency, txfix);
- }
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -45,6 +45,9 @@
void decSize(int bytes);
int getLiveSize();
+
+ /** The total number of deletes this file has */
+ int getTotalNegativeToOthers();
void setCanReclaim(boolean canDelete);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -49,6 +49,9 @@
private boolean needCleanup;
+ private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
public JournalFileImpl(final SequentialFile file, final long fileID)
@@ -65,6 +68,7 @@
negCounts.clear();
posCount.set(0);
liveBytes.set(0);
+ totalNegativeToOthers.set(0);
}
public int getPosCount()
@@ -94,6 +98,10 @@
public void incNegCount(final JournalFile file)
{
+ if (file != this)
+ {
+ totalNegativeToOthers.incrementAndGet();
+ }
getOrCreateNegCount(file).incrementAndGet();
}
@@ -219,5 +227,12 @@
{
return liveBytes.get();
}
+
+ public int getTotalNegativeToOthers()
+ {
+ return totalNegativeToOthers.get();
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -464,59 +464,70 @@
" sequence = " +
file.getFileID());
- JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ listJournalFile(out, fileFactory, file);
+ }
+ }
+
+ /**
+ * @param out
+ * @param fileFactory
+ * @param file
+ * @throws Exception
+ */
+ public static void listJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
+ out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdate " + recordInfo);
+ }
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdate " + recordInfo);
- }
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ out.println("Rollback txID=" + transactionID);
+ }
- public void onReadRollbackRecord(long transactionID) throws Exception
- {
- out.println("Rollback txID=" + transactionID);
- }
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ out.println("Prepare txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+ }
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- out.println("Prepare txID=" + transactionID);
- }
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ out.println("DeleteRecord id=" + recordID);
+ }
- public void onReadDeleteRecord(long recordID) throws Exception
- {
- out.println("DeleteRecord id=" + recordID);
- }
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ out.println("CommitRecord txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+ }
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- out.println("CommitRecord txID=" + transactionID);
- }
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecord " + recordInfo);
+ }
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecord " + recordInfo);
- }
-
- public void markAsDataFile(JournalFile file)
- {
- }
- });
- }
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+ });
}
@@ -1621,6 +1632,7 @@
{
JournalImpl.trace("Starting compacting operation on journal");
}
+ JournalImpl.log.debug("Starting compacting operation on journal");
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1765,7 +1777,7 @@
if (trace)
{
- JournalImpl.trace("Finished compacting on journal");
+ JournalImpl.log.debug("Finished compacting on journal");
}
}
@@ -2305,7 +2317,7 @@
if (compactMinFiles > 0)
{
- if (nCleanup > getMinCompact())
+ if (nCleanup > 0 && needsCompact())
{
for (JournalFile file : dataFiles)
{
@@ -2357,16 +2369,9 @@
return false;
}
- /**
- * @return
- */
- private float getMinCompact()
+ // This method is public for tests
+ public synchronized void cleanUp(final JournalFile file) throws Exception
{
- return compactMinFiles * compactPercentage;
- }
-
- private synchronized void cleanUp(final JournalFile file) throws Exception
- {
if (state != JournalImpl.STATE_LOADED)
{
return;
@@ -2388,7 +2393,7 @@
JournalImpl.trace("Cleaning up file " + file);
}
JournalImpl.log.debug("Cleaning up file " + file);
-
+
if (file.getPosCount() == 0)
{
// nothing to be done
@@ -2408,6 +2413,10 @@
jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
}
}
+
+ currentFile.resetNegCount(file);
+ currentFile.incPosCount();
+ dependencies.add(currentFile);
cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
}
@@ -2420,7 +2429,10 @@
cleaner.flush();
- cleaner.fixDependencies(file, dependencies);
+ // pointcut for tests
+ // We need to test concurrent updates on the journal, as the compacting is being performed.
+ // Usually tests will use this to hold the compacting while other structures are being updated.
+ onCompactDone();
for (JournalFile jrnfile : dependencies)
{
@@ -2437,15 +2449,35 @@
file.getFile().delete();
tmpFile.renameTo(cleanedFileName);
controlFile.delete();
+
}
finally
{
compactingLock.readLock().unlock();
- JournalImpl.log.debug("Clean up on file " + file + " done");
+ JournalImpl.log.info("Clean up on file " + file + " done");
}
}
+
+ private boolean needsCompact() throws Exception
+ {
+ JournalFile[] dataFiles = getDataFiles();
+ long totalLiveSize = 0;
+
+ for (JournalFile file : dataFiles)
+ {
+ totalLiveSize += file.getLiveSize();
+ }
+
+ long totalBytes = (long)dataFiles.length * (long)fileSize;
+
+ long compactMargin = (long)(totalBytes * compactPercentage);
+
+ return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+
+ }
+
private void checkCompact() throws Exception
{
if (compactMinFiles == 0)
@@ -2459,21 +2491,8 @@
return;
}
- JournalFile[] dataFiles = getDataFiles();
-
- long totalLiveSize = 0;
-
- for (JournalFile file : dataFiles)
+ if (needsCompact())
{
- totalLiveSize += file.getLiveSize();
- }
-
- long totalBytes = (long)dataFiles.length * (long)fileSize;
-
- long compactMargin = (long)(totalBytes * compactPercentage);
-
- if (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles)
- {
if (!compactorRunning.compareAndSet(false, true))
{
return;
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -63,20 +63,20 @@
{
Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
}
-
+
for (int j = i; j < files.length; j++)
{
if (Reclaimer.trace)
{
if (files[j].getNegCount(currentFile) != 0)
{
- Reclaimer.trace("Negative from " + files[j] + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
}
}
totNeg += files[j].getNegCount(currentFile);
}
-
+
currentFile.setCanReclaim(true);
if (posCount <= totNeg)
@@ -101,8 +101,18 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
+ file.setNeedCleanup(true);
- file.setNeedCleanup(true);
+ if (file.getTotalNegativeToOthers() == 0)
+ {
+ file.setNeedCleanup(true);
+ }
+ else
+ {
+ // This file can't be cleared as the file has negatives to other files as well
+ file.setNeedCleanup(false);
+ }
+
currentFile.setCanReclaim(false);
break;
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -950,7 +950,108 @@
}
+
+ public void testDeleteWhileCleanup() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.cleanUp(journal.getDataFiles()[0]);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ add(i);
+ }
+
+ journal.forceMoveNextFile();
+
+
+ for (int i = 10; i < 90; i++)
+ {
+ delete(i);
+ }
+
+ tCompact.start();
+
+ reusableLatchDone.await();
+
+ // Delete part of the live records while cleanup still working
+ for (int i = 1; i < 5; i++)
+ {
+ delete(i);
+ }
+
+ reusableLatchWait.countDown();
+
+ tCompact.join();
+
+ // Delete part of the live records after cleanup is done
+ for (int i = 5; i < 10; i++)
+ {
+ delete(i);
+ }
+
+ assertEquals(9, journal.getCurrentFile().getNegCount(journal.getDataFiles()[0]));
+
+ journal.forceMoveNextFile();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+
+
+
public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -24,7 +25,10 @@
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -49,9 +53,15 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
private volatile boolean running;
-
+
private AtomicInteger errors = new AtomicInteger(0);
+ private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+ private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+ private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
private JournalImpl journal;
ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
@@ -68,7 +78,7 @@
super.setUp();
errors.set(0);
-
+
File dir = new File(getTemporaryDir());
dir.mkdirs();
@@ -87,8 +97,8 @@
}
journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
- 100,
- ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES,
+ 10,
+ 15,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
"hornetq-data",
@@ -103,11 +113,22 @@
@Override
public void tearDown() throws Exception
{
- journal.stop();
+ try
+ {
+ if (journal.isStarted())
+ {
+ journal.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ // don't care :-)
+ }
}
public void testAppend() throws Exception
{
+
running = true;
SlowAppenderNoTX t1 = new SlowAppenderNoTX();
@@ -124,15 +145,28 @@
t1.start();
+ Thread.sleep(1000);
+
for (int i = 0; i < NTHREADS; i++)
{
appenders[i].start();
updaters[i].start();
}
- // TODO: parametrize this somehow
- Thread.sleep(TimeUnit.HOURS.toMillis(24));
+ long timeToEnd = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10);
+ while (System.currentTimeMillis() < timeToEnd)
+ {
+ System.out.println("Append = " + numberOfRecords +
+ ", Update = " +
+ numberOfUpdates +
+ ", Delete = " +
+ numberOfDeletes +
+ ", liveRecords = " +
+ (numberOfRecords.get() - numberOfDeletes.get()));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ }
+
running = false;
for (Thread t : appenders)
@@ -146,14 +180,39 @@
}
t1.join();
-
+
assertEquals(0, errors.get());
-
+
journal.stop();
-
+
journal.start();
-
- journal.loadInternalOnly();
+
+ ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+ journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ long appends = 0, updates = 0;
+
+ for (RecordInfo record : committedRecords)
+ {
+ if (record.isUpdate)
+ {
+ updates++;
+ }
+ else
+ {
+ appends++;
+ }
+ }
+
+ assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+
+ journal.stop();
}
private byte[] generateRecord()
@@ -180,16 +239,16 @@
while (running)
{
- int txSize = RandomUtil.randomMax(1000);
+ final int txSize = RandomUtil.randomMax(100);
long txID = JournalSoakTest.idGen.generateID();
- final ArrayList<Long> ids = new ArrayList<Long>();
+ final long ids[] = new long[txSize];
for (int i = 0; i < txSize; i++)
{
long id = JournalSoakTest.idGen.generateID();
- ids.add(id);
+ ids[i] = id;
journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
Thread.sleep(1);
}
@@ -203,6 +262,7 @@
public void done()
{
+ numberOfRecords.addAndGet(txSize);
for (Long id : ids)
{
queue.add(id);
@@ -236,7 +296,7 @@
{
try
{
- int txSize = RandomUtil.randomMax(1000);
+ int txSize = RandomUtil.randomMax(100);
int txCount = 0;
long ids[] = new long[txSize];
@@ -248,13 +308,12 @@
long id = queue.poll(60, TimeUnit.MINUTES);
ids[txCount] = id;
journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
if (++txCount == txSize)
{
journal.appendCommitRecord(txID, true, ctx);
ctx.executeOnCompletion(new DeleteTask(ids));
txCount = 0;
- txSize = RandomUtil.randomMax(1000);
+ txSize = RandomUtil.randomMax(100);
txID = JournalSoakTest.idGen.generateID();
ids = new long[txSize];
}
@@ -280,11 +339,13 @@
public void done()
{
+ numberOfUpdates.addAndGet(ids.length);
try
{
for (long id : ids)
{
- journal.appendDeleteRecord(id, true);
+ journal.appendDeleteRecord(id, false);
+ numberOfDeletes.incrementAndGet();
}
}
catch (Exception e)
@@ -314,25 +375,23 @@
{
while (running)
{
- long ids[] = new long[1000];
+ long ids[] = new long[5];
// Append
- for (int i = 0; running & i < 1000; i++)
+ for (int i = 0; running & i < ids.length; i++)
{
+ System.out.println("append slow");
ids[i] = JournalSoakTest.idGen.generateID();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(10);
+ numberOfRecords.incrementAndGet();
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(50));
}
- // Update
- for (int i = 0; running & i < 1000; i++)
- {
- journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(10);
- }
// Delete
- for (int i = 0; running & i < 1000; i++)
+ for (int i = 0; running & i < ids.length; i++)
{
- journal.appendDeleteRecord(ids[i], true);
- Thread.sleep(10);
+ System.out.println("Deleting");
+ journal.appendDeleteRecord(ids[i], false);
+ numberOfDeletes.incrementAndGet();
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -711,6 +711,8 @@
private void setupPosNeg(final int fileNumber, final int pos, final int... neg)
{
JournalFile file = files[fileNumber];
+
+ int totalDep = file.getTotalNegativeToOthers();
for (int i = 0; i < pos; i++)
{
@@ -724,8 +726,11 @@
for (int j = 0; j < neg[i]; j++)
{
file.incNegCount(reclaimable2);
+ totalDep++;
}
}
+
+ assertEquals(totalDep, file.getTotalNegativeToOthers());
}
private void debugFiles()
@@ -777,6 +782,8 @@
private boolean canDelete;
private boolean needCleanup;
+
+ private int totalDep;
public void extendOffset(final int delta)
{
@@ -822,6 +829,8 @@
int c = count == null ? 1 : count.intValue() + 1;
negCounts.put(file, c);
+
+ totalDep++;
}
public int getPosCount()
@@ -985,5 +994,13 @@
{
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getTotalNegativeToOthers()
+ */
+ public int getTotalNegativeToOthers()
+ {
+ return totalDep;
+ }
}
}
14 years, 7 months
JBoss hornetq SVN: r9487 - projects/jopr-plugin/tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-30 04:03:33 -0400 (Fri, 30 Jul 2010)
New Revision: 9487
Added:
projects/jopr-plugin/tags/HornetQ-jopr-plugin_1_0_0_FINAL/
Log:
HornetQ jopr plugin 1.0.0.Final tag
Copied: projects/jopr-plugin/tags/HornetQ-jopr-plugin_1_0_0_FINAL (from rev 9486, projects/jopr-plugin/trunk)
14 years, 7 months
JBoss hornetq SVN: r9486 - trunk/src/main/org/hornetq/core/asyncio/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-28 18:21:06 -0400 (Wed, 28 Jul 2010)
New Revision: 9486
Modified:
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
just adding javadoc
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-28 11:46:15 UTC (rev 9485)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-28 22:21:06 UTC (rev 9486)
@@ -640,6 +640,7 @@
ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ /** a direct write to the file without the use of libaio's submit. */
private native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
private native void read(ByteBuffer handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
14 years, 7 months
JBoss hornetq SVN: r9485 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-28 07:46:15 -0400 (Wed, 28 Jul 2010)
New Revision: 9485
Modified:
trunk/docs/user-manual/en/perf-tuning.xml
Log:
tweak to perf tuning docs
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2010-07-28 11:21:06 UTC (rev 9484)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2010-07-28 11:46:15 UTC (rev 9485)
@@ -190,8 +190,12 @@
<listitem>
<para>TCP buffer sizes. If you have a fast network and fast machines you may get a
performance boost by increasing the TCP send and receive buffer sizes. See the
- <xref linkend="configuring-transports"/> for more information on this.
- </para>
+ <xref linkend="configuring-transports"/> for more information on this. </para>
+ <note>
+ <para> Note that some operating systems like later versions of Linux include TCP
+ auto-tuning and setting TCP buffer sizes manually can prevent auto-tune from
+ working and actually give you worse performance!</para>
+ </note>
</listitem>
<listitem>
<para>Increase limit on file handles on the server. If you expect a lot of
14 years, 7 months
JBoss hornetq SVN: r9484 - in trunk/src/main/org/hornetq/core: message/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-28 07:21:06 -0400 (Wed, 28 Jul 2010)
New Revision: 9484
Modified:
trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
Log:
a few tweaks + fix message copy issue
Modified: trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2010-07-28 07:52:35 UTC (rev 9483)
+++ trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2010-07-28 11:21:06 UTC (rev 9484)
@@ -44,9 +44,9 @@
{
writerIndex(limit);
}
+
+ buffer.readerIndex(limit);
- readerIndex(limit);
-
this.message = message;
}
@@ -74,6 +74,8 @@
@Override
public void readerIndex(int readerIndex)
{
+ changed();
+
if (readerIndex < limit)
{
readerIndex = limit;
@@ -85,6 +87,8 @@
@Override
public void resetReaderIndex()
{
+ changed();
+
buffer.readerIndex(limit);
}
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-07-28 07:52:35 UTC (rev 9483)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-07-28 11:21:06 UTC (rev 9484)
@@ -422,7 +422,7 @@
}
}
- public void resetCopied()
+ public synchronized void resetCopied()
{
copied = false;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-07-28 07:52:35 UTC (rev 9483)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-07-28 11:21:06 UTC (rev 9484)
@@ -337,7 +337,7 @@
*/
public ChannelPipeline getPipeline() throws Exception
{
- ChannelPipeline pipeline = new DefaultChannelPipeline();
+ List<ChannelHandler> handlers = new ArrayList<ChannelHandler>();
if (sslEnabled)
{
@@ -347,37 +347,40 @@
SslHandler handler = new SslHandler(engine);
- pipeline.addLast("ssl", handler);
+ handlers.add(handler);
}
if (httpEnabled)
{
- pipeline.addLast("http-decoder", new HttpRequestDecoder());
+ handlers.add(new HttpRequestDecoder());
- pipeline.addLast("http-encoder", new HttpResponseEncoder());
+ handlers.add(new HttpResponseEncoder());
- pipeline.addLast("http-handler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
+ handlers.add(new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
if (protocol == ProtocolType.CORE)
{
// Core protocol uses it's own optimised decoder
- pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2());
+
+ handlers.add(new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP_WS)
{
- pipeline.addLast("http-decoder", new HttpRequestDecoder());
- pipeline.addLast("http-aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("http-encoder", new HttpResponseEncoder());
- pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder(decoder));
- pipeline.addLast("websocket-handler", new WebSocketServerHandler());
+ handlers.add(new HttpRequestDecoder());
+ handlers.add(new HttpChunkAggregator(65536));
+ handlers.add(new HttpResponseEncoder());
+ handlers.add(new HornetQFrameDecoder(decoder));
+ handlers.add(new WebSocketServerHandler());
}
else
{
- pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder(decoder));
+ handlers.add(new HornetQFrameDecoder(decoder));
}
- pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+ handlers.add(new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+
+ ChannelPipeline pipeline = new StaticChannelPipeline(handlers.toArray(new ChannelHandler[handlers.size()]));
return pipeline;
}
14 years, 7 months