JBoss hornetq SVN: r9473 - trunk/tests/src/org/hornetq/tests/soak/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-27 16:56:21 -0400 (Tue, 27 Jul 2010)
New Revision: 9473
Modified:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
Log:
Changing soak test for journal
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-27 20:37:56 UTC (rev 9472)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-27 20:56:21 UTC (rev 9473)
@@ -20,10 +20,14 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
@@ -63,27 +67,29 @@
File dir = new File(getTemporaryDir());
dir.mkdirs();
+
+ SequentialFileFactory factory;
+
+ int maxAIO;
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ factory = new AIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
+ }
+ else
+ {
+ factory = new NIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
+ }
- FileConfiguration fileConf = new FileConfiguration();
-
- fileConf.setJournalDirectory(getTemporaryDir());
-
- fileConf.setCreateJournalDir(true);
-
- fileConf.setCreateBindingsDir(true);
-
- fileConf.start();
-
- fileConf.setJournalMinFiles(10);
-
- journal = new JournalImpl(fileConf.getJournalFileSize(),
- fileConf.getJournalMinFiles(),
- fileConf.getJournalCompactMinFiles(),
- fileConf.getJournalCompactPercentage(),
- new AIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ 10,
+ ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES,
+ ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
+ factory,
"hornetq-data",
"hq",
- fileConf.getJournalMaxIO_NIO());
+ maxAIO);
journal.start();
journal.loadInternalOnly();
13 years, 10 months
JBoss hornetq SVN: r9472 - trunk/tests/src/org/hornetq/tests/soak/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-27 16:37:56 -0400 (Tue, 27 Jul 2010)
New Revision: 9472
Added:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
Removed:
trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
Log:
Rename test
Copied: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java (from rev 9471, trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-27 20:37:56 UTC (rev 9472)
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.config.impl.FileConfiguration;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
+import org.hornetq.utils.concurrent.LinkedBlockingDeque;
+
+/**
+ * A SoakJournal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalSoakTest extends ServiceTestBase
+{
+
+ public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private volatile boolean running;
+
+ private JournalImpl journal;
+
+ ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
+ false,
+ JournalSoakTest.class.getClassLoader());
+
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+
+ OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ File dir = new File(getTemporaryDir());
+ dir.mkdirs();
+
+ FileConfiguration fileConf = new FileConfiguration();
+
+ fileConf.setJournalDirectory(getTemporaryDir());
+
+ fileConf.setCreateJournalDir(true);
+
+ fileConf.setCreateBindingsDir(true);
+
+ fileConf.start();
+
+ fileConf.setJournalMinFiles(10);
+
+ journal = new JournalImpl(fileConf.getJournalFileSize(),
+ fileConf.getJournalMinFiles(),
+ fileConf.getJournalCompactMinFiles(),
+ fileConf.getJournalCompactPercentage(),
+ new AIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ "hornetq-data",
+ "hq",
+ fileConf.getJournalMaxIO_NIO());
+
+ journal.start();
+ journal.loadInternalOnly();
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ journal.stop();
+ }
+
+ public void testAppend() throws Exception
+ {
+ running = true;
+ SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+ int NTHREADS = 5;
+
+ FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+ FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i] = new FastAppenderTx();
+ updaters[i] = new FastUpdateTx(appenders[i].queue);
+ }
+
+ t1.start();
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i].start();
+ updaters[i].start();
+ }
+
+ Thread.sleep(TimeUnit.HOURS.toMillis(24));
+
+ running = false;
+
+ for (Thread t : appenders)
+ {
+ t.join();
+ }
+
+ for (Thread t : updaters)
+ {
+ t.join();
+ }
+
+ t1.join();
+ }
+
+ private byte[] generateRecord()
+ {
+ int size = RandomUtil.randomPositiveInt() % 10000;
+ if (size == 0)
+ {
+ size = 10000;
+ }
+ return RandomUtil.randomBytes(size);
+ }
+
+ class FastAppenderTx extends Thread
+ {
+ LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ while (running)
+ {
+ int txSize = RandomUtil.randomMax(1000);
+
+ long txID = JournalSoakTest.idGen.generateID();
+
+ final ArrayList<Long> ids = new ArrayList<Long>();
+
+ for (int i = 0; i < txSize; i++)
+ {
+ long id = JournalSoakTest.idGen.generateID();
+ ids.add(id);
+ journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
+ Thread.sleep(1);
+ }
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Long id : ids)
+ {
+ queue.add(id);
+ }
+ }
+ });
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ class FastUpdateTx extends Thread
+ {
+ final LinkedBlockingDeque<Long> queue;
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
+ {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ int txSize = RandomUtil.randomMax(1000);
+ int txCount = 0;
+ long ids[] = new long[txSize];
+
+ long txID = JournalSoakTest.idGen.generateID();
+
+ while (running)
+ {
+
+ long id = queue.poll(60, TimeUnit.MINUTES);
+ ids[txCount] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ Thread.sleep(1);
+ if (++txCount == txSize)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ txCount = 0;
+ txSize = RandomUtil.randomMax(1000);
+ txID = JournalSoakTest.idGen.generateID();
+ ids = new long[txSize];
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ class DeleteTask implements IOAsyncTask
+ {
+ final long ids[];
+
+ DeleteTask(final long ids[])
+ {
+ this.ids = ids;
+ }
+
+ public void done()
+ {
+ try
+ {
+ for (long id : ids)
+ {
+ journal.appendDeleteRecord(id, false);
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println("Can't delete id");
+ e.printStackTrace();
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ }
+
+ /** Adds stuff to the journal, but it will take a long time to remove them.
+ * This will cause cleanup and compacting to happen more often
+ */
+ class SlowAppenderNoTX extends Thread
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ long ids[] = new long[1000];
+ // Append
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = JournalSoakTest.idGen.generateID();
+ journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ // Update
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = JournalSoakTest.idGen.generateID();
+ journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ // Delete
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = JournalSoakTest.idGen.generateID();
+ journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 19:14:32 UTC (rev 9471)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 20:37:56 UTC (rev 9472)
@@ -1,339 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.soak.journal;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.OrderedExecutorFactory;
-import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
-
-/**
- * A SoakJournal
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class SoakJournal extends ServiceTestBase
-{
-
- public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
-
- private volatile boolean running;
-
- private JournalImpl journal;
-
- ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
- false,
- SoakJournal.class.getClassLoader());
-
- private ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
-
- OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- File dir = new File(getTemporaryDir());
- dir.mkdirs();
-
- FileConfiguration fileConf = new FileConfiguration();
-
- fileConf.setJournalDirectory(getTemporaryDir());
-
- fileConf.setCreateJournalDir(true);
-
- fileConf.setCreateBindingsDir(true);
-
- fileConf.start();
-
- fileConf.setJournalMinFiles(10);
-
- journal = new JournalImpl(fileConf.getJournalFileSize(),
- fileConf.getJournalMinFiles(),
- fileConf.getJournalCompactMinFiles(),
- fileConf.getJournalCompactPercentage(),
- new AIOSequentialFileFactory(fileConf.getJournalDirectory()),
- "hornetq-data",
- "hq",
- fileConf.getJournalMaxIO_NIO());
-
- journal.start();
- journal.loadInternalOnly();
-
- }
-
- public void tearDown() throws Exception
- {
- journal.stop();
- }
-
- public void testAppend() throws Exception
- {
- running = true;
- SlowAppenderNoTX t1 = new SlowAppenderNoTX();
-
- int NTHREADS = 5;
-
- FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
- FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
-
- for (int i = 0; i < NTHREADS; i++)
- {
- appenders[i] = new FastAppenderTx();
- updaters[i] = new FastUpdateTx(appenders[i].queue);
- }
-
- t1.start();
-
- for (int i = 0; i < NTHREADS; i++)
- {
- appenders[i].start();
- updaters[i].start();
- }
-
- Thread.sleep(TimeUnit.HOURS.toMillis(24));
-
- running = false;
-
- for (Thread t : appenders)
- {
- t.join();
- }
-
- for (Thread t : updaters)
- {
- t.join();
- }
-
- t1.join();
- }
-
- private byte[] generateRecord()
- {
- int size = RandomUtil.randomPositiveInt() % 10000;
- if (size == 0)
- {
- size = 10000;
- }
- return RandomUtil.randomBytes(size);
- }
-
- class FastAppenderTx extends Thread
- {
- LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
-
- OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
- public void run()
- {
- try
- {
-
- while (running)
- {
- int txSize = RandomUtil.randomMax(1000);
-
- long txID = idGen.generateID();
-
- final ArrayList<Long> ids = new ArrayList<Long>();
-
- for (int i = 0; i < txSize; i++)
- {
- long id = idGen.generateID();
- ids.add(id);
- journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
- }
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new IOAsyncTask()
- {
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- for (Long id : ids)
- {
- queue.add(id);
- }
- }
- });
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- }
- }
-
- class FastUpdateTx extends Thread
- {
- final LinkedBlockingDeque<Long> queue;
-
- OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
- public FastUpdateTx(LinkedBlockingDeque<Long> queue)
- {
- this.queue = queue;
- }
-
- public void run()
- {
- try
- {
- int txSize = RandomUtil.randomMax(1000);
- int txCount = 0;
- long ids[] = new long[txSize];
-
- long txID = idGen.generateID();
-
- while (running)
- {
-
- long id = queue.poll(60, TimeUnit.MINUTES);
- ids[txCount] = id;
- journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
- if (++txCount == txSize)
- {
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new DeleteTask(ids));
- txCount = 0;
- txSize = RandomUtil.randomMax(1000);
- txID = idGen.generateID();
- ids = new long[txSize];
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- }
- }
-
- class DeleteTask implements IOAsyncTask
- {
- final long ids[];
-
- DeleteTask(long ids[])
- {
- this.ids = ids;
- }
-
- public void done()
- {
- try
- {
- for (int i = 0; i < ids.length; i++)
- {
- journal.appendDeleteRecord(ids[i], false);
- }
- }
- catch (Exception e)
- {
- System.err.println("Can't delete id");
- e.printStackTrace();
- }
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- }
-
- /** Adds stuff to the journal, but it will take a long time to remove them.
- * This will cause cleanup and compacting to happen more often
- */
- class SlowAppenderNoTX extends Thread
- {
- public void run()
- {
- try
- {
- while (running)
- {
- long ids[] = new long[1000];
- // Append
- for (int i = 0; running & i < 1000; i++)
- {
- ids[i] = idGen.generateID();
- journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(300);
- }
- // Update
- for (int i = 0; running & i < 1000; i++)
- {
- ids[i] = idGen.generateID();
- journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(300);
- }
- // Delete
- for (int i = 0; running & i < 1000; i++)
- {
- ids[i] = idGen.generateID();
- journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(300);
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- }
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
13 years, 10 months
JBoss hornetq SVN: r9471 - in trunk: tests/src/org/hornetq/tests/soak/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-27 15:14:32 -0400 (Tue, 27 Jul 2010)
New Revision: 9471
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Making sure freed files during compacting won't go out of order + adding a few methods to help debug the journal.
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -161,7 +161,7 @@
{
try
{
- return "JournalFileImpl: " + file.getFileName();
+ return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal.impl;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -94,7 +95,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = JournalImpl.log.isTraceEnabled();
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -104,7 +105,7 @@
// Journal
private static final void trace(final String message)
{
- JournalImpl.log.trace(message);
+ log.trace(message);
}
// The sizes of primitive types
@@ -385,6 +386,142 @@
return compactor;
}
+
+
+ /** this method is used internally only however tools may use it to maintenance.
+ * It won't be part of the interface as the tools should be specific to the implementation */
+ public List<JournalFile> orderFiles() throws Exception
+ {
+ List<String> fileNames = fileFactory.listFiles(fileExtension);
+
+ List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+
+ for (String fileName : fileNames)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+
+ file.open(1, false);
+
+ try
+ {
+ long fileID = readFileHeader(file);
+
+ orderedFiles.add(new JournalFileImpl(file, fileID));
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ // Now order them by ordering id - we can't use the file name for ordering
+ // since we can re-use dataFiles
+
+ Collections.sort(orderedFiles, new JournalFileComparator());
+
+ return orderedFiles;
+ }
+
+ private void calculateNextfileID(List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+
+ }
+
+
+ /**
+ * @param fileFactory
+ * @param journal
+ * @throws Exception
+ */
+ public static void listJournalFiles(final PrintStream out, final JournalImpl journal) throws Exception
+ {
+ List<JournalFile> files = journal.orderFiles();
+
+ SequentialFileFactory fileFactory = journal.fileFactory;
+
+ for (JournalFile file : files)
+ {
+ out.println("####### listing file " + file.getFile().getFileName() +
+ " sequence = " +
+ file.getFileID());
+
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdate " + recordInfo);
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ out.println("Rollback txID=" + transactionID);
+ }
+
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ out.println("Prepare txID=" + transactionID);
+ }
+
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ out.println("DeleteRecord id=" + recordID);
+ }
+
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ out.println("CommitRecord txID=" + transactionID);
+ }
+
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecord " + recordInfo);
+ }
+
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+ });
+ }
+ }
+
+
+
+ /** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
final JournalReaderCallback reader) throws Exception
@@ -703,7 +840,11 @@
return lastDataPos;
}
-
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new Exception (e.getMessage(), e);
+ }
finally
{
if (wholeFileBuffer != null)
@@ -1476,7 +1617,10 @@
try
{
- JournalImpl.trace("Starting compacting operation on journal");
+ if (trace)
+ {
+ JournalImpl.trace("Starting compacting operation on journal");
+ }
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1579,7 +1723,10 @@
dataFiles.addFirst(fileToAdd);
}
- JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ if (trace)
+ {
+ JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ }
// Replay pending commands (including updates, deletes and commits)
@@ -1616,7 +1763,10 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- JournalImpl.trace("Finished compacting on journal");
+ if (trace)
+ {
+ JournalImpl.trace("Finished compacting on journal");
+ }
}
finally
@@ -1698,6 +1848,8 @@
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
+
+ calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2646,10 +2798,26 @@
/** being protected as testcases can override this method */
protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
{
- for (JournalFile file : oldFiles)
+
+ // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
+ filesExecutor.execute(new Runnable()
{
- addFreeFile(file);
- }
+ public void run()
+ {
+ for (JournalFile file : oldFiles)
+ {
+ try
+ {
+ addFreeFile(file);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error reinitializing file " + file, e);
+ }
+ }
+ }
+ });
+
for (JournalFile file : newFiles)
{
@@ -2795,52 +2963,6 @@
return recordSize;
}
- private List<JournalFile> orderFiles() throws Exception
- {
- List<String> fileNames = fileFactory.listFiles(fileExtension);
-
- List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-
- for (String fileName : fileNames)
- {
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
-
- file.open(1, false);
-
- try
- {
- long fileID = readFileHeader(file);
-
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(fileName);
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
-
- orderedFiles.add(new JournalFileImpl(file, fileID));
- }
- finally
- {
- file.close();
- }
- }
-
- // Now order them by ordering id - we can't use the file name for ordering
- // since we can re-use dataFiles
-
- Collections.sort(orderedFiles, new JournalFileComparator());
-
- return orderedFiles;
- }
-
/**
* @param file
* @return
@@ -3117,7 +3239,7 @@
if (JournalImpl.trace)
{
- JournalImpl.trace("moveNextFile: " + currentFile.getFile().getFileName() + " sync: " + synchronous);
+ JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
}
fileFactory.activateBuffer(currentFile.getFile());
@@ -3175,6 +3297,11 @@
new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
+
+ if (trace)
+ {
+ JournalImpl.trace("Returning file " + nextFile);
+ }
return nextFile;
}
@@ -3212,6 +3339,11 @@
private void pushOpenedFile() throws Exception
{
JournalFile nextOpenedFile = getFile(true, true, true, false);
+
+ if (trace)
+ {
+ JournalImpl.trace("pushing openFile " + nextOpenedFile);
+ }
openedFiles.offer(nextOpenedFile);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -320,6 +320,7 @@
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
+ // We also transfer the content of this buffer to the native file's buffer
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
@@ -348,6 +349,7 @@
pendingSync = false;
+ // swap the instance as the previous callback list is being used asynchronously
callbacks = new LinkedList<IOAsyncTask>();
buffer.clear();
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -118,7 +118,7 @@
updaters[i].start();
}
- Thread.sleep(3600000000l);
+ Thread.sleep(TimeUnit.HOURS.toMillis(24));
running = false;
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,19 +13,21 @@
package org.hornetq.tests.util;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
/**
* Lists the journal content for debug purposes.
*
* This is just a class useful on debug during development,
- * listing journal contents (As we don't have access to SQL on Journal :-) ).
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -52,15 +54,19 @@
{
FileConfiguration fileConf = new FileConfiguration();
+ fileConf.setJournalDirectory("/work/projects/trunk/journal");
+
// fileConf.setConfigurationUrl(arg[0]);
fileConf.start();
+ SequentialFileFactory fileFactory = new AIOSequentialFileFactory(fileConf.getJournalDirectory());
+
JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
- fileConf.getJournalMinFiles(),
+ 10,
0,
0,
- new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ fileFactory,
"hornetq-data",
"hq",
fileConf.getJournalMaxIO_NIO());
@@ -69,18 +75,31 @@
ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
journal.start();
+
+
+ PrintStream out = new PrintStream(new FileOutputStream("/tmp/file.out"));
+
+ out.println("######### Journal records per file");
+
+ JournalImpl.listJournalFiles(out, journal);
+
journal.load(records, prepared, null);
+
+ out.println();
+
+ out.println("##########################################");
+ out.println("# T O T A L L I S T #");
if (prepared.size() > 0)
{
- System.out.println("There are " + prepared.size() + " prepared transactions on the journal");
+ out.println("There are " + prepared.size() + " prepared transactions on the journal");
}
- System.out.println("Total of " + records.size() + " committed records");
+ out.println("Total of " + records.size() + " committed records");
for (RecordInfo record : records)
{
- System.out.println("user record: " + record);
+ out.println("user record: " + record);
}
journal.checkReclaimStatus();
@@ -89,6 +108,9 @@
journal.stop();
+ journal.stop();
+
+ out.close();
}
catch (Exception e)
{
13 years, 10 months
JBoss hornetq SVN: r9470 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-27 11:54:03 -0400 (Tue, 27 Jul 2010)
New Revision: 9470
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* start the cluster manager before the remoting service
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-27 09:55:27 UTC (rev 9469)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-27 15:54:03 UTC (rev 9470)
@@ -1582,16 +1582,11 @@
// We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
// it is activated
+ clusterManager.start();
- // FIXME -- I inverted the order to start the remoting service before the cluster manager.
- // when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
- // to connect to this server. If the remoting service is not started before, the connection will fail
- // and the cluster will not be formed...
initialised = true;
remotingService.start();
-
- clusterManager.start();
}
/**
13 years, 10 months
JBoss hornetq SVN: r9469 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-27 05:55:27 -0400 (Tue, 27 Jul 2010)
New Revision: 9469
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
HA refactoring
* added clusterConnection attribute to ServerLocatorInternal to distinguish between regular connection and cluster connections when creating the server resources in CoreProtocolManager.createConnectionEntry()
* the clusterConnection attribute is set to true only in ClusterConnectionImpl constructor
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-27 09:52:35 UTC (rev 9468)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-27 09:55:27 UTC (rev 9469)
@@ -1064,7 +1064,7 @@
if (serverLocator.isHA())
{
- channel0.send(new SubscribeClusterTopologyUpdatesMessage(false));
+ channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-27 09:52:35 UTC (rev 9468)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-27 09:55:27 UTC (rev 9469)
@@ -62,6 +62,8 @@
private final boolean ha;
+ private boolean clusterConnection;
+
private final String discoveryAddress;
private final int discoveryPort;
@@ -376,6 +378,8 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ clusterConnection = false;
}
/**
@@ -989,6 +993,16 @@
this.nodeID = nodeID;
}
+ public void setClusterConnection(boolean clusterConnection)
+ {
+ this.clusterConnection = clusterConnection;
+ }
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+
@Override
protected void finalize() throws Throwable
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-27 09:52:35 UTC (rev 9468)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-27 09:55:27 UTC (rev 9469)
@@ -45,4 +45,8 @@
void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
void notifyNodeDown(String nodeID);
+
+ void setClusterConnection(boolean clusterConnection);
+
+ boolean isClusterConnection();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-27 09:52:35 UTC (rev 9468)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-27 09:55:27 UTC (rev 9469)
@@ -119,8 +119,18 @@
final String clusterUser,
final String clusterPassword) throws Exception
{
+
+ if (nodeUUID == null)
+ {
+ throw new IllegalArgumentException("node id is null");
+ }
+
+ this.nodeUUID = nodeUUID;
+
this.serverLocator = serverLocator;
-
+
+ this.serverLocator.setClusterConnection(true);
+
this.connector = connector;
this.name = name;
@@ -143,13 +153,6 @@
this.maxHops = maxHops;
- if (nodeUUID == null)
- {
- throw new IllegalArgumentException("node id is null");
- }
-
- this.nodeUUID = nodeUUID;
-
this.backup = backup;
this.clusterUser = clusterUser;
13 years, 10 months
JBoss hornetq SVN: r9468 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-27 05:52:35 -0400 (Tue, 27 Jul 2010)
New Revision: 9468
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
display server NodeID in the log about server being stopped
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-26 16:22:26 UTC (rev 9467)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-27 09:52:35 UTC (rev 9468)
@@ -896,7 +896,8 @@
started = false;
initialised = false;
- // uuid = null;
+ // to display in the log message
+ SimpleString tempNodeID = nodeID;
nodeID = null;
if (activation != null)
@@ -909,7 +910,7 @@
backupActivationThread.join();
}
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
Logger.reset();
}
13 years, 10 months
JBoss hornetq SVN: r9467 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-26 12:22:26 -0400 (Mon, 26 Jul 2010)
New Revision: 9467
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
fix typo
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-26 12:13:08 UTC (rev 9466)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-26 16:22:26 UTC (rev 9467)
@@ -909,7 +909,7 @@
backupActivationThread.join();
}
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] stopped");
Logger.reset();
}
13 years, 10 months
JBoss hornetq SVN: r9466 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-26 08:13:08 -0400 (Mon, 26 Jul 2010)
New Revision: 9466
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
Log:
HA refactoring
* fix ClusterTopologyChangeMessage ctor...
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-07-26 11:37:13 UTC (rev 9465)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-07-26 12:13:08 UTC (rev 9466)
@@ -61,6 +61,8 @@
super(PacketImpl.CLUSTER_TOPOLOGY);
this.exit = true;
+
+ this.nodeID = nodeID;
}
public ClusterTopologyChangeMessage()
13 years, 10 months
JBoss hornetq SVN: r9465 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl: wireformat and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-26 07:37:13 -0400 (Mon, 26 Jul 2010)
New Revision: 9465
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
Log:
HA refactoring
* remove unused NodeAnnouceMessage packet
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-26 05:03:59 UTC (rev 9464)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-26 11:37:13 UTC (rev 9465)
@@ -132,12 +132,6 @@
}
});
}
-// else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
-// {
-// NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
-//
-// server.getClusterManager().announceNode(msg.getNodeID(), msg.isBackup(), msg.getConnector());
-// }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-07-26 05:03:59 UTC (rev 9464)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-07-26 11:37:13 UTC (rev 9465)
@@ -21,7 +21,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -91,7 +90,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -497,11 +495,6 @@
packet = new ClusterTopologyChangeMessage();
break;
}
- case NODE_ANNOUNCE:
- {
- packet = new NodeAnnounceMessage();
- break;
- }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-07-26 05:03:59 UTC (rev 9464)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-07-26 11:37:13 UTC (rev 9465)
@@ -186,8 +186,6 @@
public static final byte CLUSTER_TOPOLOGY = 110;
- public static final byte NODE_ANNOUNCE = 111;
-
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
@@ -301,7 +299,7 @@
protected String getParentString()
{
- return "PACKET[type=" + type + ", channelID=" + channelID + "]";
+ return "PACKET[type=" + type + ", channelID=" + channelID;
}
// Protected -----------------------------------------------------
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-07-26 05:03:59 UTC (rev 9464)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-07-26 11:37:13 UTC (rev 9465)
@@ -1,103 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class NodeAnnounceMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
-
- // Attributes ----------------------------------------------------
-
- private String nodeID;
-
- private boolean backup;
-
- private TransportConfiguration connector;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
- {
- super(PacketImpl.NODE_ANNOUNCE);
-
- this.nodeID = nodeID;
-
- this.backup = backup;
-
- this.connector = tc;
- }
-
- public NodeAnnounceMessage()
- {
- super(PacketImpl.NODE_ANNOUNCE);
- }
-
- // Public --------------------------------------------------------
-
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public TransportConfiguration getConnector()
- {
- return connector;
- }
-
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeString(nodeID);
- buffer.writeBoolean(backup);
- connector.encode(buffer);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- this.nodeID = buffer.readString();
- this.backup = buffer.readBoolean();
- connector = new TransportConfiguration();
- connector.decode(buffer);
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
13 years, 10 months
JBoss hornetq SVN: r9464 - in trunk/tests/src/org/hornetq/tests: soak/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-26 01:03:59 -0400 (Mon, 26 Jul 2010)
New Revision: 9464
Added:
trunk/tests/src/org/hornetq/tests/soak/journal/
trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
Modified:
trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
Log:
Adding a soak test for the journal
Added: trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-26 05:03:59 UTC (rev 9464)
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.config.impl.FileConfiguration;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
+import org.hornetq.utils.concurrent.LinkedBlockingDeque;
+
+/**
+ * A SoakJournal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SoakJournal extends ServiceTestBase
+{
+
+ public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private volatile boolean running;
+
+ private JournalImpl journal;
+
+ ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
+ false,
+ SoakJournal.class.getClassLoader());
+
+ private ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+
+ OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ File dir = new File(getTemporaryDir());
+ dir.mkdirs();
+
+ FileConfiguration fileConf = new FileConfiguration();
+
+ fileConf.setJournalDirectory(getTemporaryDir());
+
+ fileConf.setCreateJournalDir(true);
+
+ fileConf.setCreateBindingsDir(true);
+
+ fileConf.start();
+
+ fileConf.setJournalMinFiles(10);
+
+ journal = new JournalImpl(fileConf.getJournalFileSize(),
+ fileConf.getJournalMinFiles(),
+ fileConf.getJournalCompactMinFiles(),
+ fileConf.getJournalCompactPercentage(),
+ new AIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ "hornetq-data",
+ "hq",
+ fileConf.getJournalMaxIO_NIO());
+
+ journal.start();
+ journal.loadInternalOnly();
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ journal.stop();
+ }
+
+ public void testAppend() throws Exception
+ {
+ running = true;
+ SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+ int NTHREADS = 5;
+
+ FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+ FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i] = new FastAppenderTx();
+ updaters[i] = new FastUpdateTx(appenders[i].queue);
+ }
+
+ t1.start();
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i].start();
+ updaters[i].start();
+ }
+
+ Thread.sleep(3600000000l);
+
+ running = false;
+
+ for (Thread t : appenders)
+ {
+ t.join();
+ }
+
+ for (Thread t : updaters)
+ {
+ t.join();
+ }
+
+ t1.join();
+ }
+
+ private byte[] generateRecord()
+ {
+ int size = RandomUtil.randomPositiveInt() % 10000;
+ if (size == 0)
+ {
+ size = 10000;
+ }
+ return RandomUtil.randomBytes(size);
+ }
+
+ class FastAppenderTx extends Thread
+ {
+ LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public void run()
+ {
+ try
+ {
+
+ while (running)
+ {
+ int txSize = RandomUtil.randomMax(1000);
+
+ long txID = idGen.generateID();
+
+ final ArrayList<Long> ids = new ArrayList<Long>();
+
+ for (int i = 0; i < txSize; i++)
+ {
+ long id = idGen.generateID();
+ ids.add(id);
+ journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
+ Thread.sleep(1);
+ }
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ for (Long id : ids)
+ {
+ queue.add(id);
+ }
+ }
+ });
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ class FastUpdateTx extends Thread
+ {
+ final LinkedBlockingDeque<Long> queue;
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastUpdateTx(LinkedBlockingDeque<Long> queue)
+ {
+ this.queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ int txSize = RandomUtil.randomMax(1000);
+ int txCount = 0;
+ long ids[] = new long[txSize];
+
+ long txID = idGen.generateID();
+
+ while (running)
+ {
+
+ long id = queue.poll(60, TimeUnit.MINUTES);
+ ids[txCount] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ Thread.sleep(1);
+ if (++txCount == txSize)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ txCount = 0;
+ txSize = RandomUtil.randomMax(1000);
+ txID = idGen.generateID();
+ ids = new long[txSize];
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ class DeleteTask implements IOAsyncTask
+ {
+ final long ids[];
+
+ DeleteTask(long ids[])
+ {
+ this.ids = ids;
+ }
+
+ public void done()
+ {
+ try
+ {
+ for (int i = 0; i < ids.length; i++)
+ {
+ journal.appendDeleteRecord(ids[i], false);
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println("Can't delete id");
+ e.printStackTrace();
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ }
+
+ /** Adds stuff to the journal, but it will take a long time to remove them.
+ * This will cause cleanup and compacting to happen more often
+ */
+ class SlowAppenderNoTX extends Thread
+ {
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ long ids[] = new long[1000];
+ // Append
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = idGen.generateID();
+ journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ // Update
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = idGen.generateID();
+ journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ // Delete
+ for (int i = 0; running & i < 1000; i++)
+ {
+ ids[i] = idGen.generateID();
+ journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
+ Thread.sleep(300);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-07-23 08:13:58 UTC (rev 9463)
+++ trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-07-26 05:03:59 UTC (rev 9464)
@@ -71,6 +71,18 @@
{
return Math.abs(RandomUtil.randomInt());
}
+
+ public static int randomMax(int max)
+ {
+ int value = randomPositiveInt() % max;
+
+ if (value == 0)
+ {
+ value = max;
+ }
+
+ return value;
+ }
public static int randomPort()
{
13 years, 10 months