[jboss-cvs] JBoss Messaging SVN: r5214 - in branches/Branch_Chunk_CRS2: tests/jms-tests/src/org/jboss/test/messaging/jms and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 29 18:51:42 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-29 18:51:42 -0400 (Wed, 29 Oct 2008)
New Revision: 5214
Modified:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Log:
Optimization (readAhead) & tweaks on tests
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-29 22:51:42 UTC (rev 5214)
@@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -115,6 +114,10 @@
/** The current position on the message being processed */
private volatile long positionLargeMessage;
+ private volatile long sizeLargeMessage;
+
+ private SessionSendChunkMessage readAheadChunk = null;
+
private final Channel channel;
@@ -234,6 +237,8 @@
// Not replicated - just send now
pendingLargeMessage = (ServerLargeMessage)message;
positionLargeMessage = 0;
+ // it is better to cache this, as this usually means an operation on the file system
+ sizeLargeMessage = pendingLargeMessage.getBodySize();
sendChunks();
// }
// else
@@ -485,78 +490,40 @@
{
return true;
}
- final long bodySize = pendingLargeMessage.getBodySize();
-
- int chunkLength = 0;
-
- SessionSendChunkMessage chunk = null;
-
- for (; positionLargeMessage < bodySize; positionLargeMessage += chunkLength)
+
+ if (readAheadChunk != null)
{
+ positionLargeMessage += readAheadChunk.getBody().length;
+ channel.send(readAheadChunk);
+ readAheadChunk = null;
+ }
+
+ while (positionLargeMessage < sizeLargeMessage)
+ {
+
if (availableCredits.get() <= 0)
{
- return false;
- }
-
- if (positionLargeMessage == 0)
- {
- int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
-
- chunkLength = minLargeMessageSize - headerSize;
-
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
- pendingLargeMessage.encodeProperties(headerBuffer);
-
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
- pendingLargeMessage.encodeBody(bodyBuffer, 0, chunkLength);
-
- if (availableCredits != null)
+
+ if (readAheadChunk == null)
{
- availableCredits.addAndGet(-chunkLength);
+ readAheadChunk = createChunkSend();
}
-
- chunk = new SessionSendChunkMessage(id,
- headerBuffer.array(),
- bodyBuffer.array(),
- chunkLength < bodySize,
- false);
+ return false;
}
- else
+
+ SessionSendChunkMessage chunk = createChunkSend();
+
+ int chunkLen = chunk.getBody().length;
+
+ if (availableCredits != null)
{
- chunkLength = (int)Math.min(bodySize - positionLargeMessage, minLargeMessageSize);
-
- if (availableCredits != null)
- {
- int leftCredits = availableCredits.addAndGet(-chunkLength);
-// if (leftCredits < 0)
-// {
-// if (chunkLength > 0)
-// {
-// availableCredits.addAndGet(-leftCredits);
-// }
-// else
-// {
-// // sanity check only.. it shouldn't happen
-// // This next statement means, we didn't have enough credit to send anything, so we return the credits and give up sending
-// availableCredits.addAndGet(chunkLength);
-// return false;
-// }
-// }
- }
-
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
-
- pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, chunkLength);
-
- chunk = new SessionSendChunkMessage(id,
- null,
- bodyBuffer.array(),
- positionLargeMessage + chunkLength < bodySize,
- false);
+ availableCredits.addAndGet(-chunkLen);
}
channel.send(chunk);
+
+ positionLargeMessage += chunkLen;
}
pendingLargeMessage.releaseResources();
@@ -574,6 +541,51 @@
}
+
+ private SessionSendChunkMessage createChunkSend()
+ {
+ SessionSendChunkMessage chunk;
+
+ int localChunkLen = 0;
+
+ if (positionLargeMessage == 0)
+ {
+ int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+
+ localChunkLen = minLargeMessageSize - headerSize;
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+ pendingLargeMessage.encodeProperties(headerBuffer);
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+ pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
+
+
+ chunk = new SessionSendChunkMessage(id,
+ headerBuffer.array(),
+ bodyBuffer.array(),
+ localChunkLen < sizeLargeMessage,
+ false);
+ }
+ else
+ {
+ localChunkLen = (int)Math.min(sizeLargeMessage - positionLargeMessage, minLargeMessageSize);
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+
+ pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, localChunkLen);
+
+ chunk = new SessionSendChunkMessage(id,
+ null,
+ bodyBuffer.array(),
+ positionLargeMessage + localChunkLen < sizeLargeMessage,
+ false);
+ }
+
+ return chunk;
+
+ }
+
private void doClose() throws Exception
{
messageQueue.removeConsumer(this);
Modified: branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java 2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java 2008-10-29 22:51:42 UTC (rev 5214)
@@ -112,19 +112,19 @@
prod.send(tm4);
- TextMessage rm1 = (TextMessage)cons.receive(60000);
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
assertNotNull(rm1);
- TextMessage rm2 = (TextMessage)cons.receive(60000);
+ TextMessage rm2 = (TextMessage)cons.receive(1000);
assertNotNull(rm2);
- TextMessage rm3 = (TextMessage)cons.receive(60000);
+ TextMessage rm3 = (TextMessage)cons.receive(1000);
assertNotNull(rm3);
- TextMessage rm4 = (TextMessage)cons.receive(60000);
+ TextMessage rm4 = (TextMessage)cons.receive(1000);
assertNotNull(rm4);
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-10-29 22:51:42 UTC (rev 5214)
@@ -87,7 +87,26 @@
final int waitOnConsumer,
final long delayDelivery) throws Exception
{
+ testChunks(realFiles,
+ useFile,
+ numberOfMessages,
+ numberOfIntegers,
+ sendingBlocking,
+ waitOnConsumer,
+ delayDelivery,
+ false);
+ }
+ protected void testChunks(final boolean realFiles,
+ final boolean useFile,
+ final int numberOfMessages,
+ final int numberOfIntegers,
+ final boolean sendingBlocking,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final boolean testTime) throws Exception
+ {
+
clearData();
messagingService = createService(realFiles);
@@ -130,7 +149,10 @@
producer.send(message);
}
- System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+ if (testTime)
+ {
+ System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+ }
}
}
else
@@ -140,6 +162,7 @@
ClientMessage message = session.createClientMessage(true);
message.putIntProperty(new SimpleString("counter-message"), i);
message.setBody(createLargeBuffer(numberOfIntegers));
+ long timeStart = System.currentTimeMillis();
if (delayDelivery > 0)
{
message.putLongProperty(new SimpleString("original-time"), System.currentTimeMillis());
@@ -149,6 +172,10 @@
{
producer.send(message);
}
+ if (testTime)
+ {
+ System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+ }
}
}
@@ -178,7 +205,6 @@
}
session.start();
-
for (int i = 0; i < numberOfMessages; i++)
{
@@ -187,8 +213,11 @@
ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
assertNotNull(message);
-
- System.out.println("Message received in " + (System.currentTimeMillis() - start));
+
+ if (testTime)
+ {
+ System.out.println("Message received in " + (System.currentTimeMillis() - start));
+ }
start = System.currentTimeMillis();
if (delayDelivery > 0)
@@ -202,26 +231,27 @@
assertNotNull(message);
- System.out.println("msg on client = " + message.getMessageID());
-
if (delayDelivery <= 0)
{ // right now there is no guarantee of ordered delivered on multiple scheduledMessages
assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
}
- if (message instanceof FileClientMessage)
+ if (!testTime)
{
- checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
- }
- else
- {
- MessagingBuffer buffer = message.getBody();
- buffer.rewind();
- assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.limit());
- for (int b = 0; b < numberOfIntegers; b++)
+ if (message instanceof FileClientMessage)
{
- assertEquals(b, buffer.getInt());
+ checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
}
+ else
+ {
+ MessagingBuffer buffer = message.getBody();
+ buffer.rewind();
+ assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.limit());
+ for (int b = 0; b < numberOfIntegers; b++)
+ {
+ assertEquals(b, buffer.getInt());
+ }
+ }
}
}
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-29 22:51:42 UTC (rev 5214)
@@ -460,8 +460,6 @@
message2.acknowledge();
- System.out.println("msg on client = " + message2.getMessageID());
-
assertNotNull(message2);
try
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2008-10-29 22:51:42 UTC (rev 5214)
@@ -48,7 +48,7 @@
public void testMessageChunkFilePersistence1G() throws Exception
{
- testChunks(true, true, 2, 268435456, false, 120000, 0);
+ testChunks(true, true, 2, 268435456, false, 120000, 0, true);
}
// Package protected ---------------------------------------------
More information about the jboss-cvs-commits
mailing list