Author: timfox
Date: 2009-11-05 03:42:21 -0500 (Thu, 05 Nov 2009)
New Revision: 8221
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-206
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
---
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-05
07:56:15 UTC (rev 8220)
+++
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-05
08:42:21 UTC (rev 8221)
@@ -47,7 +47,9 @@
// The message we will send is size 256MB, even though we are only running in 50MB of
RAM on both client and server.
// HornetQ will support much larger message sizes, but we use 512MB so the example
runs in reasonable time.
- private final long FILE_SIZE = 256 * 1024 * 1024;
+ // private final long FILE_SIZE = 256L * 1024 * 1024;
+
+ private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
public boolean runExample() throws Exception
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-05
07:56:15 UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-05
08:42:21 UTC (rev 8221)
@@ -22,8 +22,8 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.message.Message;
-import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -250,7 +250,7 @@
if (isLarge)
{
- largeMessageSend(sendBlocking, msg);
+ largeMessageSend(sendBlocking, msg, theCredits);
}
else if (sendBlocking)
{
@@ -269,14 +269,8 @@
// Not the continuations, but this is ok since we are only interested in
limiting the amount of
// data in *memory* and continuations go straight to the disk
- if (isLarge)
+ if (!isLarge)
{
- // TODO this is pretty hacky - we should define consistent meanings of encode
size
-
- theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
- }
- else
- {
theCredits.acquireCredits(msg.getEncodeSize());
}
}
@@ -292,15 +286,14 @@
throw new HornetQException(HornetQException.OBJECT_CLOSED, "Producer is
closed");
}
}
-
-
+
// Methods to send Large
Messages----------------------------------------------------------------
-
+
/**
* @param msg
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final Message msg) throws
HornetQException
+ private void largeMessageSend(final boolean sendBlocking, final Message msg, final
ClientProducerCredits credits) throws HornetQException
{
int headerSize = msg.getHeadersAndPropertiesEncodeSize();
@@ -323,15 +316,23 @@
channel.send(initialChunk);
+ try
+ {
+ credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
+
InputStream input = msg.getBodyInputStream();
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input);
+ largeMessageSendStreamed(sendBlocking, input, credits);
}
else
{
- largeMessageSendBuffered(sendBlocking, msg);
+ largeMessageSendBuffered(sendBlocking, msg, credits);
}
}
@@ -340,7 +341,9 @@
* @param msg
* @throws HornetQException
*/
- private void largeMessageSendBuffered(final boolean sendBlocking, final Message msg)
throws HornetQException
+ private void largeMessageSendBuffered(final boolean sendBlocking,
+ final Message msg,
+ final ClientProducerCredits credits) throws
HornetQException
{
final long bodySize = msg.getLargeBodySize();
@@ -373,6 +376,14 @@
{
channel.send(chunk);
}
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
}
@@ -381,7 +392,9 @@
* @param input
* @throws HornetQException
*/
- private void largeMessageSendStreamed(final boolean sendBlocking, InputStream input)
throws HornetQException
+ private void largeMessageSendStreamed(final boolean sendBlocking,
+ final InputStream input,
+ final ClientProducerCredits credits) throws
HornetQException
{
boolean lastPacket = false;
@@ -441,6 +454,14 @@
{
channel.send(chunk);
}
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
try
@@ -455,8 +476,6 @@
}
}
-
-
// Inner Classes
--------------------------------------------------------------------------------
class DecodingContext implements LargeMessageEncodingContext
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 07:56:15
UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 08:42:21
UTC (rev 8221)
@@ -1478,7 +1478,7 @@
{
try
{
- releaseOutStanding(message);
+ releaseOutStanding(message, message.getEncodeSize());
}
catch (Exception e)
{
@@ -1499,20 +1499,23 @@
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
"large-message not initialized on server");
}
-
+
+ //Immediately release the credits for the continuations- these don't
contrinute to the in-memory size
+ //of the message
+
+ releaseOutStanding(currentLargeMessage, packet.getRequiredBufferSize());
+
currentLargeMessage.addBytes(packet.getBody());
if (!packet.isContinues())
- {
- final LargeServerMessage message = currentLargeMessage;
+ {
+ currentLargeMessage.releaseResources();
+ send(currentLargeMessage);
+
+ releaseOutStanding(currentLargeMessage,
currentLargeMessage.getEncodeSize());
+
currentLargeMessage = null;
-
- message.releaseResources();
-
- send(message);
-
- releaseOutStanding(message);
}
if (packet.isRequiresResponse())
@@ -1910,17 +1913,15 @@
* returned. When a session closes any outstanding credits will be returned.
*
*/
- private void releaseOutStanding(final ServerMessage message) throws Exception
+ private void releaseOutStanding(final ServerMessage message, final int credits) throws
Exception
{
CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
- int size = message.getEncodeSize();
+ holder.outstandingCredits -= credits;
- holder.outstandingCredits -= size;
-
- holder.store.returnProducerCredits(size);
+ holder.store.returnProducerCredits(credits);
}
-
+
private void send(final ServerMessage msg) throws Exception
{
// check the user has write access to this address.