Author: timfox
Date: 2009-08-21 16:41:47 -0400 (Fri, 21 Aug 2009)
New Revision: 7878
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
fixed large message memory issue
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-08-21
17:16:34 UTC (rev 7877)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-08-21
20:41:47 UTC (rev 7878)
@@ -15,6 +15,7 @@
import static org.hornetq.utils.SimpleString.toSimpleString;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -216,12 +217,12 @@
SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
if (msg.getBodyInputStream() != null || msg.getEncodeSize() >=
minLargeMessageSize || msg.isLargeMessage())
- {
+ {
sendMessageInChunks(sendBlocking, msg);
}
else if (sendBlocking)
- {
- channel.sendBlocking(message);
+ {
+ channel.sendBlocking(message);
}
else
{
@@ -240,7 +241,7 @@
if (headerSize >= minLargeMessageSize)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Header size
(" + headerSize +
- ") is too
big, use the messageBody for large data, or increase minLargeMessageSize");
+ ") is too big,
use the messageBody for large data, or increase minLargeMessageSize");
}
// msg.getBody() could be Null on LargeServerMessage
@@ -256,45 +257,67 @@
channel.send(initialChunk);
- if (msg.getBodyInputStream() != null)
+ InputStream input = msg.getBodyInputStream();
+
+ if (input != null)
{
boolean lastChunk = false;
- InputStream input = msg.getBodyInputStream();
+
while (!lastChunk)
{
- byte[] bytesRead = new byte[minLargeMessageSize];
- int numberOfBytesRead;
-
- try
- {
- numberOfBytesRead = input.read(bytesRead);
+ byte[] buff = new byte[minLargeMessageSize];
+
+ int pos = 0;
+
+ do
+ {
+ int numberOfBytesRead;
+
+ int wanted = minLargeMessageSize - pos;
+
+ try
+ {
+ numberOfBytesRead = input.read(buff, pos, wanted);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
+ "Error reading the
LargeMessageBody",
+ e);
+ }
+
+ if (numberOfBytesRead == -1)
+ {
+ lastChunk = true;
+
+ break;
+ }
+
+ pos += numberOfBytesRead;
}
- catch (IOException e)
+ while (pos < minLargeMessageSize);
+
+ if (lastChunk)
{
- throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
- "Error reading the
LargeMessageBody",
- e);
+ byte[] buff2 = new byte[pos];
+
+ System.arraycopy(buff, 0, buff2, 0, pos);
+
+ buff = buff2;
}
-
- if (numberOfBytesRead < 0)
- {
- numberOfBytesRead = 0;
- lastChunk = true;
- }
-
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bytesRead,
-
numberOfBytesRead,
+
+ final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(buff,
!lastChunk,
lastChunk && sendBlocking);
if (sendBlocking && lastChunk)
{
- // When sending it blocking, only the last chunk will be blocking.
+ // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
{
- channel.send(chunk);
+ channel.send(chunk);
}
}
@@ -305,8 +328,8 @@
catch (IOException e)
{
throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
- "Error closing stream from
LargeMessageBody",
- e);
+ "Error closing stream from
LargeMessageBody",
+ e);
}
}
else
@@ -327,18 +350,17 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.array(),
-
chunkLength,
+ final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.array(),
!lastChunk,
lastChunk && sendBlocking);
if (sendBlocking && lastChunk)
{
- // When sending it blocking, only the last chunk will be blocking.
+ // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
- {
+ {
channel.send(chunk);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-21
17:16:34 UTC (rev 7877)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-21
20:41:47 UTC (rev 7878)
@@ -534,8 +534,6 @@
boolean attemptFailoverOrReconnect = (backupConnectorFactory != null ||
reconnectAttempts != 0)
&& (failoverOnServerShutdown ||
!serverShutdown);
- log.info("Attempting failover or reconnect " +
attemptFailoverOrReconnect);
-
if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -640,8 +638,6 @@
}
else
{
- log.info("Just closing connections and calling failure
listeners");
-
closeConnectionsAndCallFailureListeners(me);
}
}
@@ -1096,7 +1092,6 @@
if (type == PacketImpl.DISCONNECT)
{
- log.info("Got a disconnect message");
threadPool.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for
a long time and fail can
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java 2009-08-21
17:16:34 UTC (rev 7877)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java 2009-08-21
20:41:47 UTC (rev 7878)
@@ -35,9 +35,6 @@
private boolean requiresResponse;
- // Not sent through the wire. Just to define how many bytes to send of body
- private transient int bodyLength;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -56,14 +53,12 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body,
- final int bodyLength,
+ public SessionSendContinuationMessage(final byte[] body,
final boolean continues,
final boolean requiresResponse)
{
super(SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
- this.bodyLength = bodyLength;
}
@@ -80,25 +75,20 @@
@Override
public int getRequiredBufferSize()
{
- return SESSION_CONTINUATION_BASE_SIZE + bodyLength + DataConstants.SIZE_BOOLEAN;
+ return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
}
@Override
public void encodeBody(final HornetQBuffer buffer)
{
- buffer.writeInt(bodyLength);
- buffer.writeBytes(body, 0, bodyLength);
- buffer.writeBoolean(continues);
+ super.encodeBody(buffer);
buffer.writeBoolean(requiresResponse);
}
@Override
public void decodeBody(final HornetQBuffer buffer)
{
- bodyLength = buffer.readInt();
- body = new byte[bodyLength];
- buffer.readBytes(body);
- continues = buffer.readBoolean();
+ super.decodeBody(buffer);
requiresResponse = buffer.readBoolean();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-21
17:16:34 UTC (rev 7877)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-21
20:41:47 UTC (rev 7878)
@@ -27,6 +27,7 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
@@ -69,15 +70,17 @@
public void testDLALargeMessage() throws Exception
{
- final int messageSize = 50000;
+ final int messageSize = (int)(3.5 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- ClientSession session = null;
+ ClientSession session = null;
try
{
server = createServer(true);
server.start();
+
+ log.info("*********** starting test");
ClientSessionFactory sf = createInVMFactory();
@@ -101,6 +104,8 @@
Message clientFile = createLargeClientMessage(session, messageSize, true);
+ log.info("*********** sending large message");
+
producer.send(clientFile);
session.commit();
@@ -196,7 +201,7 @@
public void testDLAOnExpiry() throws Exception
{
- final int messageSize = 50000;
+ final int messageSize = (int)(3.5 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ClientSession session = null;
@@ -332,7 +337,7 @@
public void testExpiryLargeMessage() throws Exception
{
- final int messageSize = 50000;
+ final int messageSize = (int)(3 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ClientSession session = null;