From do-not-reply at jboss.org Fri Jul 30 13:20:26 2010 Content-Type: multipart/mixed; boundary="===============1711802989521207236==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9490 - in trunk/tests/src/org/hornetq/tests: stress/journal and 1 other directory. Date: Fri, 30 Jul 2010 13:20:26 -0400 Message-ID: <201007301720.o6UHKQvw031979@svn01.web.mwc.hst.phx2.redhat.com> --===============1711802989521207236== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: clebert.suconic(a)jboss.com Date: 2010-07-30 13:20:25 -0400 (Fri, 30 Jul 2010) New Revision: 9490 Added: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactSt= ressTest.java Removed: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java Log: Move soak test as stress test Deleted: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 201= 0-07-30 15:01:16 UTC (rev 9489) +++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 201= 0-07-30 17:20:25 UTC (rev 9490) @@ -1,424 +0,0 @@ -/* - * Copyright 2010 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.tests.soak.journal; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.hornetq.core.asyncio.impl.AsynchronousFileImpl; -import org.hornetq.core.config.impl.ConfigurationImpl; -import org.hornetq.core.journal.IOAsyncTask; -import org.hornetq.core.journal.PreparedTransactionInfo; -import org.hornetq.core.journal.RecordInfo; -import org.hornetq.core.journal.SequentialFileFactory; -import org.hornetq.core.journal.TransactionFailureCallback; -import org.hornetq.core.journal.impl.AIOSequentialFileFactory; -import org.hornetq.core.journal.impl.JournalImpl; -import org.hornetq.core.journal.impl.NIOSequentialFileFactory; -import org.hornetq.core.persistence.impl.journal.OperationContextImpl; -import org.hornetq.tests.util.RandomUtil; -import org.hornetq.tests.util.ServiceTestBase; -import org.hornetq.utils.HornetQThreadFactory; -import org.hornetq.utils.OrderedExecutorFactory; -import org.hornetq.utils.SimpleIDGenerator; -import org.hornetq.utils.concurrent.LinkedBlockingDeque; - -/** - * A SoakJournal - * - * @author Clebert Suconic<= /a> - * - * - */ -public class JournalSoakTest extends ServiceTestBase -{ - - public static SimpleIDGenerator idGen =3D new SimpleIDGenerator(1); - - private volatile boolean running; - - private AtomicInteger errors =3D new AtomicInteger(0); - - private AtomicInteger numberOfRecords =3D new AtomicInteger(0); - - private AtomicInteger numberOfUpdates =3D new AtomicInteger(0); - - private AtomicInteger numberOfDeletes =3D new AtomicInteger(0); - - private JournalImpl journal; - - ThreadFactory tFactory =3D new HornetQThreadFactory("SoakTest" + System= .identityHashCode(this), - false, - JournalSoakTest.class= .getClassLoader()); - - private final ExecutorService threadPool =3D Executors.newFixedThreadPo= ol(20, tFactory); - - OrderedExecutorFactory executorFactory =3D new OrderedExecutorFactory(t= hreadPool); - - @Override - public void setUp() throws Exception - { - super.setUp(); - - errors.set(0); - - File dir =3D new File(getTemporaryDir()); - dir.mkdirs(); - - SequentialFileFactory factory; - - int maxAIO; - if (AsynchronousFileImpl.isLoaded()) - { - factory =3D new AIOSequentialFileFactory(dir.getPath()); - maxAIO =3D ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO; - } - else - { - factory =3D new NIOSequentialFileFactory(dir.getPath()); - maxAIO =3D ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO; - } - - journal =3D new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_S= IZE, - 10, - 15, - ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_= PERCENTAGE, - factory, - "hornetq-data", - "hq", - maxAIO); - - journal.start(); - journal.loadInternalOnly(); - - } - - @Override - public void tearDown() throws Exception - { - try - { - if (journal.isStarted()) - { - journal.stop(); - } - } - catch (Exception e) - { - // don't care :-) - } - } - - public void testAppend() throws Exception - { - - running =3D true; - SlowAppenderNoTX t1 =3D new SlowAppenderNoTX(); - - int NTHREADS =3D 5; - - FastAppenderTx appenders[] =3D new FastAppenderTx[NTHREADS]; - FastUpdateTx updaters[] =3D new FastUpdateTx[NTHREADS]; - - for (int i =3D 0; i < NTHREADS; i++) - { - appenders[i] =3D new FastAppenderTx(); - updaters[i] =3D new FastUpdateTx(appenders[i].queue); - } - - t1.start(); - - Thread.sleep(1000); - - for (int i =3D 0; i < NTHREADS; i++) - { - appenders[i].start(); - updaters[i].start(); - } - - long timeToEnd =3D System.currentTimeMillis() + TimeUnit.MINUTES.toM= illis(10); - - while (System.currentTimeMillis() < timeToEnd) - { - System.out.println("Append =3D " + numberOfRecords + - ", Update =3D " + - numberOfUpdates + - ", Delete =3D " + - numberOfDeletes + - ", liveRecords =3D " + - (numberOfRecords.get() - numberOfDeletes.get()= )); - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - } - - running =3D false; - - for (Thread t : appenders) - { - t.join(); - } - - for (Thread t : updaters) - { - t.join(); - } - - t1.join(); - - assertEquals(0, errors.get()); - - journal.stop(); - - journal.start(); - - ArrayList committedRecords =3D new ArrayList= (); - ArrayList preparedTransactions =3D new Arra= yList(); - journal.load(committedRecords, preparedTransactions, new Transaction= FailureCallback() - { - public void failedTransaction(long transactionID, List records, List recordsToDelete) - { - } - }); - - long appends =3D 0, updates =3D 0; - - for (RecordInfo record : committedRecords) - { - if (record.isUpdate) - { - updates++; - } - else - { - appends++; - } - } - - assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends); - - journal.stop(); - } - - private byte[] generateRecord() - { - int size =3D RandomUtil.randomPositiveInt() % 10000; - if (size =3D=3D 0) - { - size =3D 10000; - } - return RandomUtil.randomBytes(size); - } - - class FastAppenderTx extends Thread - { - LinkedBlockingDeque queue =3D new LinkedBlockingDeque(); - - OperationContextImpl ctx =3D new OperationContextImpl(executorFactor= y.getExecutor()); - - @Override - public void run() - { - try - { - - while (running) - { - final int txSize =3D RandomUtil.randomMax(100); - - long txID =3D JournalSoakTest.idGen.generateID(); - - final long ids[] =3D new long[txSize]; - - for (int i =3D 0; i < txSize; i++) - { - long id =3D JournalSoakTest.idGen.generateID(); - ids[i] =3D id; - journal.appendAddRecordTransactional(txID, id, (byte)0, = generateRecord()); - Thread.sleep(1); - } - journal.appendCommitRecord(txID, true, ctx); - ctx.executeOnCompletion(new IOAsyncTask() - { - - public void onError(final int errorCode, final String er= rorMessage) - { - } - - public void done() - { - numberOfRecords.addAndGet(txSize); - for (Long id : ids) - { - queue.add(id); - } - } - }); - } - } - catch (Exception e) - { - e.printStackTrace(); - running =3D false; - errors.incrementAndGet(); - } - } - } - - class FastUpdateTx extends Thread - { - final LinkedBlockingDeque queue; - - OperationContextImpl ctx =3D new OperationContextImpl(executorFactor= y.getExecutor()); - - public FastUpdateTx(final LinkedBlockingDeque queue) - { - this.queue =3D queue; - } - - @Override - public void run() - { - try - { - int txSize =3D RandomUtil.randomMax(100); - int txCount =3D 0; - long ids[] =3D new long[txSize]; - - long txID =3D JournalSoakTest.idGen.generateID(); - - while (running) - { - - long id =3D queue.poll(60, TimeUnit.MINUTES); - ids[txCount] =3D id; - journal.appendUpdateRecordTransactional(txID, id, (byte)0, = generateRecord()); - if (++txCount =3D=3D txSize) - { - journal.appendCommitRecord(txID, true, ctx); - ctx.executeOnCompletion(new DeleteTask(ids)); - txCount =3D 0; - txSize =3D RandomUtil.randomMax(100); - txID =3D JournalSoakTest.idGen.generateID(); - ids =3D new long[txSize]; - } - } - } - catch (Exception e) - { - e.printStackTrace(); - running =3D false; - errors.incrementAndGet(); - } - } - } - - class DeleteTask implements IOAsyncTask - { - final long ids[]; - - DeleteTask(final long ids[]) - { - this.ids =3D ids; - } - - public void done() - { - numberOfUpdates.addAndGet(ids.length); - try - { - for (long id : ids) - { - journal.appendDeleteRecord(id, false); - numberOfDeletes.incrementAndGet(); - } - } - catch (Exception e) - { - System.err.println("Can't delete id"); - e.printStackTrace(); - running =3D false; - errors.incrementAndGet(); - } - } - - public void onError(final int errorCode, final String errorMessage) - { - } - - } - - /** Adds stuff to the journal, but it will take a long time to remove t= hem. - * This will cause cleanup and compacting to happen more often - */ - class SlowAppenderNoTX extends Thread - { - @Override - public void run() - { - try - { - while (running) - { - long ids[] =3D new long[5]; - // Append - for (int i =3D 0; running & i < ids.length; i++) - { - System.out.println("append slow"); - ids[i] =3D JournalSoakTest.idGen.generateID(); - journal.appendAddRecord(ids[i], (byte)1, generateRecord(= ), true); - numberOfRecords.incrementAndGet(); - - Thread.sleep(TimeUnit.SECONDS.toMillis(50)); - } - // Delete - for (int i =3D 0; running & i < ids.length; i++) - { - System.out.println("Deleting"); - journal.appendDeleteRecord(ids[i], false); - numberOfDeletes.incrementAndGet(); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - System.exit(-1); - } - } - } - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} Copied: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupComp= actStressTest.java (from rev 9488, trunk/tests/src/org/hornetq/tests/soak/j= ournal/JournalSoakTest.java) =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactS= tressTest.java (rev 0) +++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactS= tressTest.java 2010-07-30 17:20:25 UTC (rev 9490) @@ -0,0 +1,429 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.stress.journal; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.hornetq.core.asyncio.impl.AsynchronousFileImpl; +import org.hornetq.core.config.impl.ConfigurationImpl; +import org.hornetq.core.journal.IOAsyncTask; +import org.hornetq.core.journal.PreparedTransactionInfo; +import org.hornetq.core.journal.RecordInfo; +import org.hornetq.core.journal.SequentialFileFactory; +import org.hornetq.core.journal.TransactionFailureCallback; +import org.hornetq.core.journal.impl.AIOSequentialFileFactory; +import org.hornetq.core.journal.impl.JournalImpl; +import org.hornetq.core.journal.impl.NIOSequentialFileFactory; +import org.hornetq.core.persistence.impl.journal.OperationContextImpl; +import org.hornetq.tests.util.RandomUtil; +import org.hornetq.tests.util.ServiceTestBase; +import org.hornetq.utils.HornetQThreadFactory; +import org.hornetq.utils.OrderedExecutorFactory; +import org.hornetq.utils.SimpleIDGenerator; +import org.hornetq.utils.concurrent.LinkedBlockingDeque; + +/** + * A SoakJournal + * + * @author Clebert Suconic<= /a> + * + * + */ +public class JournalCleanupCompactStressTest extends ServiceTestBase +{ + + public static SimpleIDGenerator idGen =3D new SimpleIDGenerator(1); + + private volatile boolean running; + + private AtomicInteger errors =3D new AtomicInteger(0); + + private AtomicInteger numberOfRecords =3D new AtomicInteger(0); + + private AtomicInteger numberOfUpdates =3D new AtomicInteger(0); + + private AtomicInteger numberOfDeletes =3D new AtomicInteger(0); + + private JournalImpl journal; + + ThreadFactory tFactory =3D new HornetQThreadFactory("SoakTest" + System= .identityHashCode(this), + false, + JournalCleanupCompact= StressTest.class.getClassLoader()); + + private final ExecutorService threadPool =3D Executors.newFixedThreadPo= ol(20, tFactory); + + OrderedExecutorFactory executorFactory =3D new OrderedExecutorFactory(t= hreadPool); + + @Override + public void setUp() throws Exception + { + super.setUp(); + + errors.set(0); + + File dir =3D new File(getTemporaryDir()); + dir.mkdirs(); + + SequentialFileFactory factory; + + int maxAIO; + if (AsynchronousFileImpl.isLoaded()) + { + factory =3D new AIOSequentialFileFactory(dir.getPath()); + maxAIO =3D ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO; + } + else + { + factory =3D new NIOSequentialFileFactory(dir.getPath()); + maxAIO =3D ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO; + } + + journal =3D new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_S= IZE, + 10, + 15, + ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_= PERCENTAGE, + factory, + "hornetq-data", + "hq", + maxAIO); + + journal.start(); + journal.loadInternalOnly(); + + } + + @Override + public void tearDown() throws Exception + { + try + { + if (journal.isStarted()) + { + journal.stop(); + } + } + catch (Exception e) + { + // don't care :-) + } + } + = + protected long getTotalTimeMilliseconds() + { + return TimeUnit.MINUTES.toMillis(10); + } + + public void testAppend() throws Exception + { + + running =3D true; + SlowAppenderNoTX t1 =3D new SlowAppenderNoTX(); + + int NTHREADS =3D 5; + + FastAppenderTx appenders[] =3D new FastAppenderTx[NTHREADS]; + FastUpdateTx updaters[] =3D new FastUpdateTx[NTHREADS]; + + for (int i =3D 0; i < NTHREADS; i++) + { + appenders[i] =3D new FastAppenderTx(); + updaters[i] =3D new FastUpdateTx(appenders[i].queue); + } + + t1.start(); + + Thread.sleep(1000); + + for (int i =3D 0; i < NTHREADS; i++) + { + appenders[i].start(); + updaters[i].start(); + } + + long timeToEnd =3D System.currentTimeMillis() + getTotalTimeMillisec= onds(); + + while (System.currentTimeMillis() < timeToEnd) + { + System.out.println("Append =3D " + numberOfRecords + + ", Update =3D " + + numberOfUpdates + + ", Delete =3D " + + numberOfDeletes + + ", liveRecords =3D " + + (numberOfRecords.get() - numberOfDeletes.get()= )); + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } + + running =3D false; + + for (Thread t : appenders) + { + t.join(); + } + + for (Thread t : updaters) + { + t.join(); + } + + t1.join(); + + assertEquals(0, errors.get()); + + journal.stop(); + + journal.start(); + + ArrayList committedRecords =3D new ArrayList= (); + ArrayList preparedTransactions =3D new Arra= yList(); + journal.load(committedRecords, preparedTransactions, new Transaction= FailureCallback() + { + public void failedTransaction(long transactionID, List records, List recordsToDelete) + { + } + }); + + long appends =3D 0, updates =3D 0; + + for (RecordInfo record : committedRecords) + { + if (record.isUpdate) + { + updates++; + } + else + { + appends++; + } + } + + assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends); + + journal.stop(); + } + + private byte[] generateRecord() + { + int size =3D RandomUtil.randomPositiveInt() % 10000; + if (size =3D=3D 0) + { + size =3D 10000; + } + return RandomUtil.randomBytes(size); + } + + class FastAppenderTx extends Thread + { + LinkedBlockingDeque queue =3D new LinkedBlockingDeque(); + + OperationContextImpl ctx =3D new OperationContextImpl(executorFactor= y.getExecutor()); + + @Override + public void run() + { + try + { + + while (running) + { + final int txSize =3D RandomUtil.randomMax(100); + + long txID =3D JournalCleanupCompactStressTest.idGen.generat= eID(); + + final long ids[] =3D new long[txSize]; + + for (int i =3D 0; i < txSize; i++) + { + long id =3D JournalCleanupCompactStressTest.idGen.genera= teID(); + ids[i] =3D id; + journal.appendAddRecordTransactional(txID, id, (byte)0, = generateRecord()); + Thread.sleep(1); + } + journal.appendCommitRecord(txID, true, ctx); + ctx.executeOnCompletion(new IOAsyncTask() + { + + public void onError(final int errorCode, final String er= rorMessage) + { + } + + public void done() + { + numberOfRecords.addAndGet(txSize); + for (Long id : ids) + { + queue.add(id); + } + } + }); + } + } + catch (Exception e) + { + e.printStackTrace(); + running =3D false; + errors.incrementAndGet(); + } + } + } + + class FastUpdateTx extends Thread + { + final LinkedBlockingDeque queue; + + OperationContextImpl ctx =3D new OperationContextImpl(executorFactor= y.getExecutor()); + + public FastUpdateTx(final LinkedBlockingDeque queue) + { + this.queue =3D queue; + } + + @Override + public void run() + { + try + { + int txSize =3D RandomUtil.randomMax(100); + int txCount =3D 0; + long ids[] =3D new long[txSize]; + + long txID =3D JournalCleanupCompactStressTest.idGen.generateID= (); + + while (running) + { + + long id =3D queue.poll(60, TimeUnit.MINUTES); + ids[txCount] =3D id; + journal.appendUpdateRecordTransactional(txID, id, (byte)0, = generateRecord()); + if (++txCount =3D=3D txSize) + { + journal.appendCommitRecord(txID, true, ctx); + ctx.executeOnCompletion(new DeleteTask(ids)); + txCount =3D 0; + txSize =3D RandomUtil.randomMax(100); + txID =3D JournalCleanupCompactStressTest.idGen.generateI= D(); + ids =3D new long[txSize]; + } + } + } + catch (Exception e) + { + e.printStackTrace(); + running =3D false; + errors.incrementAndGet(); + } + } + } + + class DeleteTask implements IOAsyncTask + { + final long ids[]; + + DeleteTask(final long ids[]) + { + this.ids =3D ids; + } + + public void done() + { + numberOfUpdates.addAndGet(ids.length); + try + { + for (long id : ids) + { + journal.appendDeleteRecord(id, false); + numberOfDeletes.incrementAndGet(); + } + } + catch (Exception e) + { + System.err.println("Can't delete id"); + e.printStackTrace(); + running =3D false; + errors.incrementAndGet(); + } + } + + public void onError(final int errorCode, final String errorMessage) + { + } + + } + + /** Adds stuff to the journal, but it will take a long time to remove t= hem. + * This will cause cleanup and compacting to happen more often + */ + class SlowAppenderNoTX extends Thread + { + @Override + public void run() + { + try + { + while (running) + { + long ids[] =3D new long[5]; + // Append + for (int i =3D 0; running & i < ids.length; i++) + { + System.out.println("append slow"); + ids[i] =3D JournalCleanupCompactStressTest.idGen.generat= eID(); + journal.appendAddRecord(ids[i], (byte)1, generateRecord(= ), true); + numberOfRecords.incrementAndGet(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(50)); + } + // Delete + for (int i =3D 0; running & i < ids.length; i++) + { + System.out.println("Deleting"); + journal.appendDeleteRecord(ids[i], false); + numberOfDeletes.incrementAndGet(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + System.exit(-1); + } + } + } + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} --===============1711802989521207236==--