[jboss-cvs] JBoss Messaging SVN: r5210 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/server/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 29 15:47:45 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-29 15:47:45 -0400 (Wed, 29 Oct 2008)
New Revision: 5210
Modified:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
Log:
tweaks
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-29 17:07:08 UTC (rev 5209)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-29 19:47:45 UTC (rev 5210)
@@ -220,7 +220,6 @@
{
if (file.isOpen())
{
- System.out.println("Closing file " + file);
try
{
file.close();
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-29 17:07:08 UTC (rev 5209)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-29 19:47:45 UTC (rev 5210)
@@ -192,7 +192,7 @@
return HandleStatus.HANDLED;
}
-
+
lock.lock();
try
@@ -200,7 +200,6 @@
if (pendingLargeMessage != null)
{
- new Exception("Busy because of pendingLargeMessage").printStackTrace();
return HandleStatus.BUSY;
}
@@ -478,10 +477,14 @@
{
lock.lock();
-
+
try
{
+ if (pendingLargeMessage == null)
+ {
+ return true;
+ }
final long bodySize = pendingLargeMessage.getBodySize();
int chunkLength = 0;
@@ -493,13 +496,8 @@
if (availableCredits.get() <= 0)
{
- System.out.println("Cancelling.. not enough credits");
return false;
}
- else
- {
- System.out.println("good!!!");
- }
if (positionLargeMessage == 0)
{
@@ -564,6 +562,7 @@
pendingLargeMessage.releaseResources();
this.pendingLargeMessage = null;
this.positionLargeMessage = -1;
+
return true;
}
@@ -571,6 +570,7 @@
{
lock.unlock();
}
+
}
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-10-29 17:07:08 UTC (rev 5209)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-10-29 19:47:45 UTC (rev 5210)
@@ -20,7 +20,6 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.integration.chunkmessage;
import java.io.File;
@@ -56,7 +55,6 @@
public class ChunkTestBase extends ServiceTestBase
{
-
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(ChunkTestBase.class);
@@ -80,47 +78,48 @@
super.tearDown();
deleteData();
}
-
+
protected void testChunks(final boolean realFiles,
- final boolean useFile,
- final int numberOfMessages,
- final int numberOfIntegers,
- final boolean sendingBlocking,
- final int waitOnConsumer,
- final long delayDelivery) throws Exception
+ final boolean useFile,
+ final int numberOfMessages,
+ final int numberOfIntegers,
+ final boolean sendingBlocking,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
{
-
+
clearData();
-
+
messagingService = createService(realFiles);
messagingService.start();
-
+
try
{
ClientSessionFactory sf = createInVMFactory();
-
+
if (sendingBlocking)
{
sf.setBlockOnNonPersistentSend(true);
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
}
-
+
ClientSession session = sf.createSession(false, true, true, false);
-
+
session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
+
ClientProducer producer = session.createProducer(ADDRESS);
-
+
if (useFile)
{
- File tmpData = createLargeFile(temporaryDir, "someFile.dat", numberOfIntegers);
-
+ File tmpData = createLargeFile(temporaryDir, "someFile.dat", numberOfIntegers);
+
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage message = session.createFileMessage(true);
((FileClientMessage)message).setFile(tmpData);
message.putIntProperty(new SimpleString("counter-message"), i);
+ long timeStart = System.currentTimeMillis();
if (delayDelivery > 0)
{
message.putLongProperty(new SimpleString("original-time"), System.currentTimeMillis());
@@ -130,6 +129,8 @@
{
producer.send(message);
}
+
+ System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
}
}
else
@@ -150,24 +151,23 @@
}
}
}
-
-
+
session.close();
-
+
if (realFiles)
{
messagingService.stop();
-
+
messagingService = createService(realFiles);
messagingService.start();
-
+
sf = createInVMFactory();
}
-
+
session = sf.createSession(false, true, true, false);
-
+
ClientConsumer consumer = null;
-
+
if (realFiles)
{
consumer = session.createFileConsumer(new File(clientLargeMessagesDir), ADDRESS);
@@ -176,34 +176,39 @@
{
consumer = session.createConsumer(ADDRESS);
}
+
+ session.start();
- session.start();
-
+
for (int i = 0; i < numberOfMessages; i++)
{
+ long start = System.currentTimeMillis();
+
ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
-
+
assertNotNull(message);
-
+ System.out.println("Message received in " + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+
if (delayDelivery > 0)
{
long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
- assertTrue("difference = "+ (System.currentTimeMillis() - originalTime), System.currentTimeMillis() - originalTime >= delayDelivery);
+ assertTrue("difference = " + (System.currentTimeMillis() - originalTime),
+ System.currentTimeMillis() - originalTime >= delayDelivery);
}
-
+
message.acknowledge();
-
+
assertNotNull(message);
-
+
System.out.println("msg on client = " + message.getMessageID());
-
-
+
if (delayDelivery <= 0)
{ // right now there is no guarantee of ordered delivered on multiple scheduledMessages
assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
}
-
+
if (message instanceof FileClientMessage)
{
checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
@@ -219,11 +224,10 @@
}
}
}
-
+
session.close();
-
- File largemsgsFile = new File(this.largeMessagesDir);
- assertEquals(0, largemsgsFile.list().length);
+
+ validateNoFilesOnLargeDir();
}
finally
{
@@ -237,7 +241,7 @@
}
}
- protected MessagingBuffer createLargeBuffer(int numberOfIntegers)
+ protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
{
ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
MessagingBuffer body = new ByteBufferWrapper(ioBuffer);
@@ -247,9 +251,9 @@
body.putInt(i);
}
body.flip();
-
+
return body;
-
+
}
protected FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
@@ -271,10 +275,11 @@
* @throws FileNotFoundException
* @throws IOException
*/
- protected File createLargeFile(String directory, String name, final int numberOfIntegers) throws FileNotFoundException, IOException
+ protected File createLargeFile(final String directory, final String name, final int numberOfIntegers) throws FileNotFoundException,
+ IOException
{
File tmpFile = new File(directory + "/" + name);
-
+
log.info("Creating file " + tmpFile);
RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
@@ -292,7 +297,7 @@
}
buffer.putInt(i);
}
-
+
if (buffer.position() > 0)
{
buffer.flip();
@@ -316,8 +321,8 @@
* @throws IOException
*/
protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfIntegers) throws MessagingException,
- FileNotFoundException,
- IOException
+ FileNotFoundException,
+ IOException
{
session.start();
@@ -350,7 +355,7 @@
* @throws IOException
*/
protected void checkFileRead(final File receivedFile, final int numberOfIntegers) throws FileNotFoundException,
- IOException
+ IOException
{
RandomAccessFile random2 = new RandomAccessFile(receivedFile, "r");
FileChannel channel2 = random2.getChannel();
@@ -358,11 +363,11 @@
ByteBuffer buffer2 = ByteBuffer.allocate(1000 * 4);
channel2.position(0l);
-
+
for (int i = 0; i < numberOfIntegers;)
{
channel2.read(buffer2);
-
+
buffer2.flip();
for (int j = 0; j < buffer2.limit() / 4; j++, i++)
{
@@ -375,6 +380,29 @@
channel2.close();
}
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ File largeMessagesFileDir = new File(largeMessagesDir);
+
+ // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+ for (int i = 0; i < 100; i++)
+ {
+ if (largeMessagesFileDir.listFiles().length > 0)
+ {
+ Thread.sleep(10);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ assertEquals(0, largeMessagesFileDir.listFiles().length);
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-29 17:07:08 UTC (rev 5209)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-29 19:47:45 UTC (rev 5210)
@@ -76,33 +76,23 @@
// Public --------------------------------------------------------
- // Validate the functions to create and verify files
- public void testFiles() throws Exception
- {
- clearData();
-
- File file = createLargeFile(temporaryDir, "test.tst", 13333);
-
- checkFileRead(file, 13333);
- }
-
public void testCleanup() throws Exception
{
clearData();
-
+
createLargeFile(largeMessagesDir, "1234.tmp", 13333);
-
+
Configuration config = createDefaultConfig();
messagingService = createService(true, config, new HashMap<String, QueueSettings>());
messagingService.start();
-
+
try
{
-
+
File directoryLarge = new File(largeMessagesDir);
-
+
assertEquals(0, directoryLarge.list().length);
}
finally
@@ -110,7 +100,7 @@
messagingService.stop();
}
}
-
+
public void testFailureOnSendingFile() throws Exception
{
clearData();
@@ -186,23 +176,8 @@
{
}
- File largeMessagesFileDir = new File(largeMessagesDir);
+ validateNoFilesOnLargeDir();
- // Deleting the file is async... we keep looking for a period of the time until the file is really gone
- for (int i = 0; i < 100; i++)
- {
- if (largeMessagesFileDir.listFiles().length > 0)
- {
- Thread.sleep(10);
- }
- else
- {
- break;
- }
- }
-
- assertEquals(0, largeMessagesFileDir.listFiles().length);
-
}
finally
{
@@ -218,6 +193,26 @@
}
+ // Validate the functions to create and verify files
+ public void testFiles() throws Exception
+ {
+ clearData();
+
+ File file = createLargeFile(temporaryDir, "test.tst", 13333);
+
+ checkFileRead(file, 13333);
+ }
+
+ public void testMessageChunkFilePersistence() throws Exception
+ {
+ testChunks(true, false, 100, 262144, false, 1000, 0);
+ }
+
+ public void testMessageChunkFilePersistenceDelayed() throws Exception
+ {
+ testChunks(true, false, 1, 50000, false, 1000, 2000);
+ }
+
public void testMessageChunkNullPersistence() throws Exception
{
testChunks(false, false, 1, 50000, false, 1000, 0);
@@ -228,14 +223,16 @@
testChunks(false, false, 100, 50000, false, 10000, 100);
}
- public void testMessageChunkFilePersistence() throws Exception
+ public void testPageOnLargeMessage() throws Exception
{
- testChunks(true, false, 100, 262144, false, 1000, 0);
+ testPageOnLargeMessage(true, false);
+
}
- public void testMessageChunkFilePersistenceDelayed() throws Exception
+ public void testPageOnLargeMessageNullPersistence() throws Exception
{
- testChunks(true, false, 1, 50000, false, 1000, 2000);
+ testPageOnLargeMessage(false, false);
+
}
public void testSendfileMessage() throws Exception
@@ -249,17 +246,17 @@
testChunks(false, true, 100, 50000, false, 1000, 0);
}
+ public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
+ {
+ testChunks(false, true, 100, 100, false, 1000, 0);
+ }
+
public void testSendfileMessageSmallMessage() throws Exception
{
testChunks(true, true, 100, 4, false, 1000, 0);
}
- public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
- {
- testChunks(false, true, 100, 100, false, 1000, 0);
- }
-
public void testSendRegularMessageNullPersistence() throws Exception
{
testChunks(false, false, 100, 100, false, 1000, 0);
@@ -330,8 +327,7 @@
session.close();
- File largeMessagesFileDir = new File(largeMessagesDir);
- assertEquals(0, largeMessagesFileDir.listFiles().length);
+ validateNoFilesOnLargeDir();
}
finally
{
@@ -346,23 +342,24 @@
}
- public void testPageOnLargeMessage() throws Exception
- {
- testPageOnLargeMessage(true, false);
+ // Package protected ---------------------------------------------
- }
+ // Protected -----------------------------------------------------
- public void testPageOnLargeMessageNullPersistence() throws Exception
+ @Override
+ protected void setUp() throws Exception
{
- testPageOnLargeMessage(false, false);
+ super.setUp();
+ log.info("\n*********************************************************************************\n Starting " + this.getName() + "\n*********************************************************************************");
+ }
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ log.info("\n*********************************************************************************\nDone with " + this.getName() + "\n*********************************************************************************" );
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
{
@@ -503,19 +500,7 @@
}
}
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-10-29 17:07:08 UTC (rev 5209)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-10-29 19:47:45 UTC (rev 5210)
@@ -56,6 +56,11 @@
testChunks(true, true, 10, 26214400, false, 120000, 0);
}
+ public void testMessageChunkFilePersistence1M() throws Exception
+ {
+ testChunks(true, true, 1000, 262144, false, 120000, 0);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -65,7 +70,6 @@
{
super.tearDown();
}
-
// Private -------------------------------------------------------
More information about the jboss-cvs-commits
mailing list