[jboss-cvs] JBoss Messaging SVN: r4701 - in trunk: src/main/org/jboss/messaging/core/journal and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 21 19:06:11 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-21 19:06:10 -0400 (Mon, 21 Jul 2008)
New Revision: 4701
Modified:
trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -219,7 +219,13 @@
if (perfParams.isDrainQueue())
{
drainQueue(messageConsumer);
+ if (perfParams.isSessionTransacted())
+ {
+ log.info("commit on drainQueue");
+ session.commit();
+ }
}
+
log.info("READY!!!");
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -75,13 +75,6 @@
long load(LoadManager reloadManager) throws Exception;
-
- // Start and stop reclaimer
-
- void startReclaimer();
-
- void stopReclaimer();
-
int getAlignment() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -287,6 +287,7 @@
public void onError(int errorCode, String errorMessage)
{
+ log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception (errorMessage));
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -175,14 +175,8 @@
private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
- private ExecutorService closingExecutor = null;
+ private ExecutorService filesExecutor = null;
- /**
- * We have a separated executor for open, as if we used the same executor this would still represent
- * a point of wait between the closing and open.
- * */
- private ExecutorService openExecutor = null;
-
/*
* We use a semaphore rather than synchronized since it performs better when
* contended
@@ -200,8 +194,6 @@
private Reclaimer reclaimer = new Reclaimer();
- private Thread shutdownHook = null;
-
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -682,8 +674,6 @@
throw new IllegalStateException("Journal must be in started state");
}
- addShutdownHook();
-
Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
List<JournalFile> orderedFiles = orderFiles();
@@ -1200,12 +1190,12 @@
callback.waitCompletion(aioTimeout);
}
- if (closingExecutor != null && !closingExecutor.isShutdown())
+ if (filesExecutor != null && !filesExecutor.isShutdown())
{
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
- this.closingExecutor.execute(new Runnable()
+ this.filesExecutor.execute(new Runnable()
{
public void run()
{
@@ -1216,25 +1206,9 @@
latch.await();
}
- if (openExecutor != null && !openExecutor.isShutdown())
- {
- // Send something to the closingExecutor, just to make sure we went until its end
- final CountDownLatch latch = new CountDownLatch(1);
-
- this.openExecutor.execute(new Runnable()
- {
- public void run()
- {
- latch.countDown();
- }
- });
-
- latch.await();
- }
-
}
- public synchronized void checkAndReclaimFiles() throws Exception
+ public void checkAndReclaimFiles() throws Exception
{
checkReclaimStatus();
@@ -1386,8 +1360,7 @@
throw new IllegalStateException("Journal is not stopped");
}
- this.openExecutor = Executors.newSingleThreadExecutor();
- this.closingExecutor = Executors.newSingleThreadExecutor();
+ this.filesExecutor = Executors.newSingleThreadExecutor();
state = STATE_STARTED;
@@ -1395,28 +1368,18 @@
public synchronized void stop() throws Exception
{
- clearShutdownHook();
-
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
}
- stopReclaimer();
-
- closingExecutor.shutdown();
- if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
- {
- throw new IllegalStateException("Time out waiting for closing executor to finish");
- }
-
if (currentFile != null)
{
currentFile.getFile().close();
}
- openExecutor.shutdown();
- if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
+ filesExecutor.shutdown();
+ if (!filesExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
{
throw new IllegalStateException("Time out waiting for open executor to finish");
}
@@ -1437,22 +1400,6 @@
state = STATE_STOPPED;
}
- public void startReclaimer()
- {
- if (state == STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is stopped");
- }
- }
-
- public void stopReclaimer()
- {
- if (state == STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is already stopped");
- }
- }
-
// Public -----------------------------------------------------------------------------
// Private -----------------------------------------------------------------------------
@@ -1636,46 +1583,6 @@
return orderedFiles;
}
- private void clearShutdownHook()
- {
- if (shutdownHook != null)
- {
- try
- {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- catch (Throwable e)
- {
- }
- shutdownHook = null;
- }
- }
-
- private void addShutdownHook()
- {
-
- clearShutdownHook();
-
-
- shutdownHook = new Thread()
- {
- public void run()
- {
- try
- {
- log.info("Journal being stopped");
- JournalImpl.this.stop();
- }
- catch (Exception e)
- {
- log.warn(e, e);
- }
- }
- };
- Runtime.getRuntime().addShutdownHook(shutdownHook);
-
- }
-
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
lock.acquire();
@@ -1787,7 +1694,7 @@
private JournalFile enqueueOpenFile() throws InterruptedException
{
if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- openExecutor.execute(new Runnable()
+ filesExecutor.execute(new Runnable()
{
public void run()
{
@@ -1803,7 +1710,7 @@
});
if (autoReclaim)
{
- openExecutor.execute(new Runnable()
+ filesExecutor.execute(new Runnable()
{
public void run()
{
@@ -1860,7 +1767,7 @@
private void closeFile(final JournalFile file)
{
- this.closingExecutor.execute(new Runnable() { public void run()
+ this.filesExecutor.execute(new Runnable() { public void run()
{
try
{
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -561,12 +561,8 @@
bindingsJournal.start();
- bindingsJournal.startReclaimer();
-
messageJournal.start();
- messageJournal.startReclaimer();
-
started = true;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -146,8 +146,6 @@
startJournal();
load();
- journal.startReclaimer();
-
long start = System.currentTimeMillis();
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -143,8 +143,6 @@
startJournal();
load();
- journal.startReclaimer();
-
long start = System.currentTimeMillis();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-21 23:06:10 UTC (rev 4701)
@@ -708,9 +708,7 @@
assertFalse(jsm.isStarted());
bindingsJournal.start();
- bindingsJournal.startReclaimer();
messageJournal.start();
- messageJournal.startReclaimer();
EasyMock.replay(messageJournal, bindingsJournal);
More information about the jboss-cvs-commits
mailing list