[jboss-cvs] JBoss Messaging SVN: r4175 - in trunk: src/etc and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 13 05:25:51 EDT 2008
Author: timfox
Date: 2008-05-13 05:25:51 -0400 (Tue, 13 May 2008)
New Revision: 4175
Modified:
trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
trunk/src/etc/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
trunk/src/main/org/jboss/messaging/util/TypedProperties.java
trunk/src/main/org/jboss/messaging/util/VariableLatch.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Mainy small tweaks, reformatting and cosmetic changes to new AIO stuff
Modified: trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-05-13 09:25:51 UTC (rev 4175)
@@ -8,8 +8,8 @@
extern "C" {
#endif
/* Inaccessible static: log */
+/* Inaccessible static: totalMaxIO */
/* Inaccessible static: loaded */
-/* Inaccessible static: totalMaxIO */
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
* Method: init
Modified: trunk/src/etc/jbm-configuration.xml
===================================================================
--- trunk/src/etc/jbm-configuration.xml 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/etc/jbm-configuration.xml 2008-05-13 09:25:51 UTC (rev 4175)
@@ -21,7 +21,7 @@
<remoting-host>localhost</remoting-host>
- <!-- timeout in seconds -->
+ <!-- timeout in milliseconds -->
<remoting-timeout>5000</remoting-timeout>
<!-- true to disable invm communication when the client and the server are in the same JVM. -->
@@ -67,6 +67,7 @@
<journal-sync>true</journal-sync>
+ <!-- 10 MB journal file size -->
<journal-file-size>104857600</journal-file-size>
<journal-min-files>2</journal-min-files>
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -9,15 +9,13 @@
import java.nio.ByteBuffer;
-
/**
*
* @author clebert.suconic at jboss.com
*
*/
public interface AsynchronousFile
-{
-
+{
void close() throws Exception;
/**
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -27,88 +27,94 @@
*/
public class AsynchronousFileImpl implements AsynchronousFile
{
- private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+ // Static
+ // -------------------------------------------------------------------------------
+
+ private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+
+ private static AtomicInteger totalMaxIO = new AtomicInteger(0);
+
+ private static boolean loaded = false;
+
+ static void addMax(int io)
+ {
+ totalMaxIO.addAndGet(io);
+ }
+
+ /** For test purposes */
+ public static int getTotalMaxIO()
+ {
+ return totalMaxIO.get();
+ }
+
+ private static boolean loadLibrary(String name)
+ {
+ try
+ {
+ log.trace(name + " being loaded");
+ System.loadLibrary(name);
+ return isNativeLoaded();
+ }
+ catch (Throwable e)
+ {
+ log.trace(name + " -> error loading it", e);
+ return false;
+ }
+
+ }
+
+ static
+ {
+ String libraries[] = new String[] {"JBMLibAIO", "JBMLibAIO32", "JBMLibAIO64"};
+
+ for (String library: libraries)
+ {
+ if (loadLibrary(library))
+ {
+ loaded = true;
+ break;
+ }
+ else
+ {
+ log.debug("Library " + library + " not found!");
+ }
+ }
+
+ if (!loaded)
+ {
+ log.debug("Couldn't locate LibAIO Wrapper");
+ }
+ }
+
+ public static boolean isLoaded()
+ {
+ return loaded;
+ }
+
+ // Attributes
+ // ---------------------------------------------------------------------------------
+
private boolean opened = false;
private String fileName;
- private Thread poller;
- private static boolean loaded = false;
- private int maxIO;
+ private Thread poller;
+ private int maxIO;
+ private Semaphore writeSemaphore;
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private Lock writeLock = lock.writeLock();
+ private Semaphore pollerSemaphore = new Semaphore(1);
- private static AtomicInteger totalMaxIO = new AtomicInteger(0);
-
- static void addMax(int io)
- {
- totalMaxIO.addAndGet(io);
- }
-
- /** For test purposes */
- public static int getTotalMaxIO()
- {
- return totalMaxIO.get();
- }
-
- Semaphore writeSemaphore;
-
- ReadWriteLock lock = new ReentrantReadWriteLock();
- Lock writeLock = lock.writeLock();
-
- Semaphore pollerSemaphore = new Semaphore(1);
-
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
*/
private long handler;
- private static boolean loadLibrary(String name)
- {
- try
- {
- log.trace(name + " being loaded");
- System.loadLibrary(name);
- return isNativeLoaded();
- }
- catch (Throwable e)
- {
- log.trace(name + " -> error loading it", e);
- return false;
- }
-
- }
- static
- {
- String libraries[] = new String[] {"JBMLibAIO", "JBMLibAIO32", "JBMLibAIO64"};
-
-
- for (String library: libraries)
- {
- if (loadLibrary(library))
- {
- loaded = true;
- break;
- }
- else
- {
- log.debug("Library " + library + " not found!");
- }
- }
-
- if (!loaded)
- {
- log.debug("Couldn't locate LibAIO Wrapper");
- }
- }
- public static boolean isLoaded()
+ // AsynchronousFile implementation
+ // ------------------------------------------------------------------------------------
+
+ public void open(final String fileName, final int maxIO)
{
- return loaded;
- }
-
-
-
-
- public void open(String fileName, int maxIO)
- {
try
{
writeLock.lock();
@@ -131,27 +137,7 @@
writeLock.unlock();
}
}
-
- class PollerThread extends Thread
- {
- PollerThread ()
- {
- super("NativePoller for " + fileName);
- }
- public void run()
- {
- // informing caller that this thread already has the lock
- try
- {
- pollEvents();
- }
- finally
- {
- pollerSemaphore.release();
- }
- }
- }
-
+
public void close() throws Exception
{
checkOpened();
@@ -174,9 +160,8 @@
pollerSemaphore.release();
}
}
-
-
- public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+
+ public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
this.writeSemaphore.acquireUninterruptibly();
@@ -192,7 +177,7 @@
}
- public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+ public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
this.writeSemaphore.acquireUninterruptibly();
@@ -204,8 +189,7 @@
{
writeSemaphore.release();
throw e;
- }
-
+ }
}
public long size()
@@ -226,6 +210,8 @@
return 512;
}
+ // Private
+ // ---------------------------------------------------------------------------------
/** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
@@ -251,7 +237,7 @@
internalPollEvents(handler);
}
- private synchronized void startPoller()
+ private synchronized void startPoller()
{
checkOpened();
@@ -267,8 +253,6 @@
}
}
-
-
private void checkOpened()
{
if (!opened)
@@ -277,6 +261,9 @@
}
}
+ // Native
+ // ------------------------------------------------------------------------------------------
+
/**
* I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
*/
@@ -305,8 +292,27 @@
// Should we make this method static?
public native ByteBuffer newBuffer(long size);
+
+ // Inner classes
+ // -----------------------------------------------------------------------------------------
-
-
-
+ private class PollerThread extends Thread
+ {
+ PollerThread ()
+ {
+ super("NativePoller for " + fileName);
+ }
+ public void run()
+ {
+ // informing caller that this thread already has the lock
+ try
+ {
+ pollEvents();
+ }
+ finally
+ {
+ pollerSemaphore.release();
+ }
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -114,9 +114,7 @@
this.remotingConnection = remotingConnection;
- //This is always true since the MinaHandler will use an executor based on session id
- //TODO can remove this
- this.direct = true;
+ this.direct = direct;
this.tokenBatchSize = tokenBatchSize;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -1,3 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.messaging.core.journal;
import org.jboss.messaging.util.MessagingBuffer;
@@ -2,9 +23,18 @@
-/**
+/**
+ *
* This class provides encoding support for the Journal.
- * */
+ *
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
public interface EncodingSupport
{
int encodeSize();
+
void encode(MessagingBuffer buffer);
+
+ void decode(MessagingBuffer buffer);
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -27,7 +27,7 @@
*
* A SequentialFile
*
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>Journal
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public interface SequentialFile
@@ -47,8 +47,7 @@
void delete() throws Exception;
- int write(ByteBuffer bytes, boolean sync, IOCallback callback)
- throws Exception;
+ int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
int write(ByteBuffer bytes, boolean sync) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -22,26 +22,36 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
+/**
+ *
+ * A AIOSequentialFile
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
public class AIOSequentialFile implements SequentialFile
{
private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
- String journalDir;
- String fileName;
- boolean opened = false;
- int maxIO;
+ private final String journalDir;
+
+ private final String fileName;
- AsynchronousFile aioFile;
+ private boolean opened = false;
- AtomicLong position = new AtomicLong(0);
+ private final int maxIO;
+
+ private AsynchronousFile aioFile;
+
+ private AtomicLong position = new AtomicLong(0);
- // A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cuase
+ // A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cause
// serious performance problems. Because of that we make all the writes on AIO using a single thread.
- ExecutorService executor;
+ private ExecutorService executor;
- public AIOSequentialFile(String journalDir, String fileName, int maxIO) throws Exception
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
{
- this.journalDir = journalDir;
+ this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
}
@@ -49,6 +59,7 @@
public int getAlignment() throws Exception
{
checkOpened();
+
return aioFile.getBlockSize();
}
@@ -60,9 +71,7 @@
return pos;
}
-
-
-
+
public synchronized void close() throws Exception
{
checkOpened();
@@ -70,8 +79,7 @@
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
aioFile.close();
- aioFile = null;
-
+ aioFile = null;
}
public void delete() throws Exception
@@ -86,8 +94,7 @@
file.delete();
}
- public void fill(int position, int size, byte fillCharacter)
- throws Exception
+ public void fill(int position, final int size, final byte fillCharacter) throws Exception
{
checkOpened();
@@ -97,22 +104,21 @@
{
blockSize = 10*1024*1024;
}
+ else if (size % (1024*1024) == 0)
+ {
+ blockSize = 1024*1024;
+ }
+ else if (size % (10*1024) == 0)
+ {
+ blockSize = 10*1024;
+ }
else
- if (size % (1024*1024) == 0)
- {
- blockSize = 1024*1024;
- }
- else
- if (size % (10*1024) == 0)
- {
- blockSize = 10*1024;
- }
- else
- {
- blockSize = aioFile.getBlockSize();
- }
+ {
+ blockSize = aioFile.getBlockSize();
+ }
int blocks = size / blockSize;
+
if (size % blockSize != 0)
{
blocks++;
@@ -122,8 +128,8 @@
{
position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
}
- aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
+ aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
}
public String getFileName()
@@ -141,26 +147,28 @@
}
- public void position(int pos) throws Exception
+ public void position(final int pos) throws Exception
{
- position.set(pos);
-
+ position.set(pos);
}
- public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
int bytesToRead = bytes.limit();
+
long positionToRead = position.getAndAdd(bytesToRead);
bytes.rewind();
+
aioFile.read(positionToRead, bytesToRead, bytes, callback);
return bytesToRead;
}
- public int read(ByteBuffer bytes) throws Exception
+ public int read(final ByteBuffer bytes) throws Exception
{
WaitCompletion waitCompletion = new WaitCompletion();
+
int bytesRead = read (bytes, waitCompletion);
waitCompletion.waitLatch();
@@ -174,10 +182,10 @@
}
- public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback)
- throws Exception
+ public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback) throws Exception
{
final int bytesToWrite = bytes.limit();
+
final long positionToWrite = position.getAndAdd(bytesToWrite);
execWrite(bytes, callback, bytesToWrite, positionToWrite);
@@ -186,7 +194,7 @@
}
private void execWrite(final ByteBuffer bytes, final IOCallback callback,
- final int bytesToWrite, final long positionToWrite)
+ final int bytesToWrite, final long positionToWrite)
{
executor.execute(new Runnable()
{
@@ -205,11 +213,10 @@
}
}
}
- });
-
+ });
}
- public int write(ByteBuffer bytes, boolean sync) throws Exception
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
{
return write (bytes, sync, DummyCallback.instance);
}
@@ -222,9 +229,8 @@
}
}
- static class DummyCallback implements IOCallback
- {
-
+ private static class DummyCallback implements IOCallback
+ {
static DummyCallback instance = new DummyCallback();
public void done()
@@ -233,16 +239,15 @@
public void onError(int errorCode, String errorMessage)
{
- }
-
+ }
}
- class WaitCompletion implements IOCallback
- {
-
+ private static class WaitCompletion implements IOCallback
+ {
CountDownLatch latch = new CountDownLatch(1);
String errorMessage;
+
int errorCode = 0;
public void done()
@@ -250,20 +255,18 @@
latch.countDown();
}
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
this.errorCode = errorCode;
+
this.errorMessage = errorMessage;
- latch.countDown();
-
+ latch.countDown();
}
public void waitLatch() throws Exception
{
latch.await();
- }
-
- }
-
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -12,15 +12,21 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+/**
+ *
+ * A AIOSequentialFileFactory
+ *
+ * @author clebert.suconic at jboss.com
+ *
+ */
public class AIOSequentialFileFactory extends AbstractSequentialFactory
-{
-
- public AIOSequentialFileFactory(String journalDir)
+{
+ public AIOSequentialFileFactory(final String journalDir)
{
super(journalDir);
}
- public SequentialFile createSequentialFile(String fileName, boolean sync, int maxIO) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO) throws Exception
{
return new AIOSequentialFile(journalDir, fileName, maxIO);
}
@@ -45,11 +51,10 @@
}
// For tests only
- public ByteBuffer wrapBuffer(byte[] bytes)
+ public ByteBuffer wrapBuffer(final byte[] bytes)
{
ByteBuffer newbuffer = newBuffer(bytes.length);
newbuffer.put(bytes);
return newbuffer;
};
-
-}
+ }
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -66,6 +66,7 @@
public class JournalImpl implements TestableJournal
{
private static final Logger log = Logger.getLogger(JournalImpl.class);
+
private static final boolean trace = log.isTraceEnabled();
private static final int STATE_STOPPED = 0;
@@ -160,10 +161,9 @@
private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
- private boolean shouldUseCallback = false;
+ private final boolean shouldUseCallback;
-
-
+
/*
* We use a semaphore rather than synchronized since it performs better when contended
*/
@@ -186,8 +186,8 @@
private Reclaimer reclaimer = new Reclaimer();
public JournalImpl(final int fileSize, final int minFiles,
- final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
- final String filePrefix, final String fileExtension, final int maxAIO)
+ final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
+ final String filePrefix, final String fileExtension, final int maxAIO)
{
if (fileSize < MIN_FILE_SIZE)
{
@@ -239,7 +239,7 @@
// Journal implementation ----------------------------------------------------------------
- public void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception
+ public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -276,7 +276,7 @@
posFilesMap.put(id, new PosFiles(usedFile));
}
- public void appendAddRecord(long id, byte recordType, byte[] record) throws Exception
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -296,9 +296,9 @@
bb.rewind();
JournalFile usedFile;
+
if (shouldUseCallback)
- {
-
+ {
SimpleCallback callback = new SimpleCallback();
usedFile = appendRecord(bb, true, callback);
callback.waitCompletion();
@@ -348,8 +348,7 @@
{
usedFile = appendRecord(bb, true);
}
-
-
+
posFiles.addUpdateFile(usedFile);
}
@@ -1175,8 +1174,7 @@
{
JournalFile[] files = new JournalFile[dataFiles.size()];
- reclaimer.scan(dataFiles.toArray(files));
-
+ reclaimer.scan(dataFiles.toArray(files));
}
public String debug() throws Exception
@@ -1197,8 +1195,7 @@
builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
builder.append(((JournalFileImpl)currentFile).debug());
-
-
+
return builder.toString();
}
@@ -1343,7 +1340,7 @@
// Private -----------------------------------------------------------------------------
- private JournalFile appendRecord(ByteBuffer bb, boolean sync) throws Exception
+ private JournalFile appendRecord(final ByteBuffer bb, final boolean sync) throws Exception
{
lock.acquire();
@@ -1362,7 +1359,7 @@
}
}
- private JournalFile appendRecord(ByteBuffer bb, boolean sync, IOCallback callback) throws Exception
+ private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
{
lock.acquire();
@@ -1381,7 +1378,7 @@
}
}
- private void repairFrom(int pos, JournalFile file) throws Exception
+ private void repairFrom(final int pos, final JournalFile file) throws Exception
{
log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
" in the record that starts at position " + pos + ". " +
@@ -1444,12 +1441,12 @@
}
private void checkFile(final int size) throws Exception
- {
-
+ {
if (size % currentFile.getFile().getAlignment() != 0)
{
throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
}
+
//We take into account the first timestamp long
if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
{
@@ -1488,7 +1485,7 @@
return tx;
}
- private TransactionCallback getTransactionCallback(long transactionId)
+ private TransactionCallback getTransactionCallback(final long transactionId)
{
TransactionCallback callback = this.transactionCallbacks.get(transactionId);
@@ -1501,18 +1498,10 @@
return callback;
}
- private void removeTransactionCallback(long transactionId)
- {
- transactionCallbacks.remove(transactionId);
- }
-
-
-
// Inner classes ---------------------------------------------------------------------------
- class SimpleCallback implements IOCallback
- {
-
+ private static class SimpleCallback implements IOCallback
+ {
String errorMessage;
int errorCode;
CountDownLatch latch = new CountDownLatch(1);
@@ -1522,12 +1511,11 @@
latch.countDown();
}
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
this.errorMessage = errorMessage;
this.errorCode = errorCode;
- latch.countDown();
-
+ latch.countDown();
}
public void waitCompletion() throws InterruptedException
@@ -1541,13 +1529,11 @@
{
throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
}
- }
-
+ }
}
- class TransactionCallback implements IOCallback
- {
-
+ private static class TransactionCallback implements IOCallback
+ {
VariableLatch countLatch = new VariableLatch();
String errorMessage = null;
@@ -1573,7 +1559,7 @@
}
}
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
this.errorMessage = errorMessage;
this.errorCode = errorCode;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -67,12 +67,11 @@
return 1;
}
- public int calculateBlockStart(int position) throws Exception
+ public int calculateBlockStart(final int position) throws Exception
{
return position;
- }
+ }
-
public String getFileName()
{
return fileName;
@@ -127,12 +126,12 @@
close();
}
- public int read(ByteBuffer bytes) throws Exception
+ public int read(final ByteBuffer bytes) throws Exception
{
return read(bytes, null);
}
- public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
try
{
@@ -156,12 +155,12 @@
}
- public int write(ByteBuffer bytes, boolean sync) throws Exception
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
{
return write(bytes, sync, null);
}
- public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+ public int write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
int bytesRead = channel.write(bytes);
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -21,26 +21,10 @@
*/
package org.jboss.messaging.core.message.impl;
-import static org.jboss.messaging.util.DataConstants.BOOLEAN;
-import static org.jboss.messaging.util.DataConstants.BYTE;
-import static org.jboss.messaging.util.DataConstants.BYTES;
-import static org.jboss.messaging.util.DataConstants.CHAR;
-import static org.jboss.messaging.util.DataConstants.DOUBLE;
-import static org.jboss.messaging.util.DataConstants.FLOAT;
-import static org.jboss.messaging.util.DataConstants.INT;
-import static org.jboss.messaging.util.DataConstants.LONG;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.SHORT;
import static org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
-import static org.jboss.messaging.util.DataConstants.SIZE_CHAR;
-import static org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
-import static org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
-import static org.jboss.messaging.util.DataConstants.SIZE_SHORT;
-import static org.jboss.messaging.util.DataConstants.STRING;
import java.util.Set;
@@ -128,17 +112,6 @@
public void encode(MessagingBuffer buff)
{
-// buff.putSimpleString(destination);
-// buff.putInt(type);
-// buff.putBoolean(durable);
-// buff.putLong(expiration);
-// buff.putLong(timestamp);
-// buff.putByte(priority);
-// properties.encode(buff);
-// buff.putInt(body.limit());
-// buff.putBytes(body.array(), 0, body.limit());
-
-
buff.putSimpleString(destination);
buff.putInt(type);
buff.putBoolean(durable);
@@ -153,14 +126,6 @@
public int encodeSize()
{
-// return /* Destination */ SimpleString.sizeofString(destination) +
-// /* Type */ SIZE_INT +
-// /* Durable */ SIZE_BOOLEAN +
-// /* Expiration */ SIZE_LONG +
-// /* Timestamp */ SIZE_LONG +
-// /* Priority */ SIZE_BYTE +
-// /* PropertySize and Properties */ properties.encodeSize() +
-// /* BodySize and Body */ SIZE_INT + body.limit();
return /* Destination */ SimpleString.sizeofString(destination) +
/* Type */ SIZE_INT +
/* Durable */ SIZE_BOOLEAN +
@@ -168,8 +133,7 @@
/* Timestamp */ SIZE_LONG +
/* Priority */ SIZE_BYTE +
/* PropertySize and Properties */ properties.encodeSize() +
- /* BodySize and Body */ SIZE_INT + body.limit();
-
+ /* BodySize and Body */ SIZE_INT + body.limit();
}
public void decode(final MessagingBuffer buffer)
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -134,14 +134,13 @@
log.info("AIO loaded successfully");
}
}
- else
- if (config.getJournalType() == JournalType.NIO)
+ else if (config.getJournalType() == JournalType.NIO)
{
journalFF = new NIOSequentialFileFactory(bindingsDir);
}
- else
- if (config.getJournalType() == JournalType.JDBC)
- { // Sanity check only... this is previously tested
+ else if (config.getJournalType() == JournalType.JDBC)
+ {
+ // Sanity check only... this is previously tested
throw new IllegalArgumentException("JDBC Journal is not supported yet");
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -7,6 +7,7 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.util.MessagingBuffer;
@@ -21,6 +22,9 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ProducerSendMessage.class);
+
+
// Attributes ----------------------------------------------------
private ClientMessage clientMessage;
Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -61,7 +61,6 @@
public class TypedProperties implements EncodingSupport
{
private static final Logger log = Logger.getLogger(TypedProperties.class);
-
private Map<SimpleString, PropertyValue> properties;
Modified: trunk/src/main/org/jboss/messaging/util/VariableLatch.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -25,7 +25,9 @@
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-/** This class will use the framework provided to by AbstractQueuedSynchronizer.
+/**
+ *
+ * This class will use the framework provided to by AbstractQueuedSynchronizer.
* AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.
*
* The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())
@@ -41,20 +43,18 @@
* @see AbstractQueuedSynchronizer*/
@SuppressWarnings("serial")
private static class CountSync extends AbstractQueuedSynchronizer
- {
-
+ {
public CountSync ()
{
setState(0);
}
-
-
+
public int getCount()
{
return getState();
}
- public int tryAcquireShared(int numberOfAqcquires)
+ public int tryAcquireShared(final int numberOfAqcquires)
{
return getState()==0 ? 1 : -1;
}
@@ -71,9 +71,8 @@
}
}
}
-
-
- public boolean tryReleaseShared(int numberOfReleases)
+
+ public boolean tryReleaseShared(final int numberOfReleases)
{
for (;;)
{
@@ -93,9 +92,8 @@
}
}
- CountSync control = new CountSync();
-
-
+ private CountSync control = new CountSync();
+
public int getCount()
{
return control.getCount();
@@ -116,7 +114,7 @@
control.acquireSharedInterruptibly(1);
}
- public void waitCompletion(int seconds) throws InterruptedException
+ public void waitCompletion(final int seconds) throws InterruptedException
{
if (!control.tryAcquireSharedNanos(1, TimeUnit.SECONDS.toNanos(seconds)))
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -81,8 +81,6 @@
prod.send(tm);
- log.info("sent message");
-
if (i % 10 == 0)
{
sess.commit();
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -258,13 +258,9 @@
MessageConsumer cons2 = sessConsume2.createConsumer(queue1);
// this should cancel message and cause delivery to other consumer
-
- log.info("closing session");
-
+
sessConsume1.close();
- log.info("closed session");
-
TextMessage tm3 = (TextMessage)cons2.receive(1000);
assertNotNull(tm3);
@@ -814,16 +810,10 @@
assertEquals("One", m.getText());
- log.info("Closing consumer");
queueConsumer.close();
- log.info("Closed consumer");
-
- log.info("Committing session");
+
consumerSession.commit();
- log.info("Committed session");
-
-
-
+
// I expect that "Two" is still in the queue
MessageConsumer queueConsumer2 = consumerSession.createConsumer(queue1);
@@ -1463,7 +1453,6 @@
failed = true;
break;
}
- log.info("received message");
if (!m.getText().equals("testing"))
{
failed = true;
@@ -3987,11 +3976,8 @@
TextMessage tm = (TextMessage)m;
count++;
- log.info(this + " Got message:" + count);
-
try
- {
- log.info(this + " message:" + tm.getText());
+ {;
if (count == 1)
{
if (!("a".equals(tm.getText())))
@@ -3999,12 +3985,10 @@
failed("Should be a but was " + tm.getText());
latch.release();
}
- log.info("Throwing exception");
throw new RuntimeException("Aardvark");
}
else if (count == 2)
{
- log.info("ack mode:" + sess.getAcknowledgeMode());
if (sess.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE || sess.getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE)
{
//Message should be immediately redelivered
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-13 09:25:51 UTC (rev 4175)
@@ -85,6 +85,22 @@
conn.close();
}
+ public static void main(String[] args)
+ {
+ try
+ {
+ CoreClientTest test = new CoreClientTest();
+
+ test.setUp();
+ test.testCoreClientPerf();
+ test.tearDown();
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+ }
+
public void testCoreClientPerf() throws Exception
{
Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
@@ -94,7 +110,7 @@
ClientConnection conn = cf.createConnection();
- ClientSession session = conn.createClientSession(false, true, false, -1, false, false);
+ final ClientSession session = conn.createClientSession(false, true, true, 1000, false, false);
session.createQueue(QUEUE, QUEUE, null, false, false);
ClientProducer producer = session.createProducer(QUEUE);
@@ -102,6 +118,13 @@
ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
+ //byte[] bytes = new byte[1000];
+
+ //message.getBody().putBytes(bytes);
+
+ message.getBody().flip();
+
+
ClientConsumer consumer = session.createConsumer(QUEUE, null, false, false, true);
final CountDownLatch latch = new CountDownLatch(1);
@@ -115,46 +138,71 @@
public void onMessage(ClientMessage msg)
{
count++;
+
+ try
+ {
+ session.acknowledge();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
if (count == numMessages)
{
latch.countDown();
- }
+ }
}
}
consumer.setMessageHandler(new MyHandler());
+
+ //System.out.println("Waiting 10 secs");
+ // Thread.sleep(10000);
+ System.out.println("Starting");
-
-
-
+ //conn.start();
+
+ long start = System.currentTimeMillis();
+
for (int i = 0; i < numMessages; i++)
{
producer.send(message);
}
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ System.out.println("Rate is " + actualRate);
+
+ conn.start();
+
+ start = System.currentTimeMillis();
+
+ latch.await();
+
// long end = System.currentTimeMillis();
//
// double actualRate = 1000 * (double)numMessages / ( end - start);
//
// System.out.println("Rate is " + actualRate);
- conn.start();
+ //conn.start();
- long start = System.currentTimeMillis();
-
+
//start = System.currentTimeMillis();
- latch.await();
- long end = System.currentTimeMillis();
- double actualRate = 1000 * (double)numMessages / ( end - start);
+ end = System.currentTimeMillis();
- System.out.println("Rate is " + actualRate);
+ actualRate = 1000 * (double)numMessages / ( end - start);
+ System.out.println(" consume Rate is " + actualRate);
+
//
// message = consumer.receive(1000);
//
More information about the jboss-cvs-commits
mailing list