[hornetq-commits] JBoss hornetq SVN: r7878 - in trunk: src/main/org/hornetq/core/remoting/impl/wireformat and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 21 16:41:48 EDT 2009


Author: timfox
Date: 2009-08-21 16:41:47 -0400 (Fri, 21 Aug 2009)
New Revision: 7878

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
fixed large message memory issue

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-08-21 17:16:34 UTC (rev 7877)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-08-21 20:41:47 UTC (rev 7878)
@@ -15,6 +15,7 @@
 
 import static org.hornetq.utils.SimpleString.toSimpleString;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -216,12 +217,12 @@
       SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
 
       if (msg.getBodyInputStream() != null || msg.getEncodeSize() >= minLargeMessageSize || msg.isLargeMessage())
-      {         
+      {
          sendMessageInChunks(sendBlocking, msg);
       }
       else if (sendBlocking)
-      {         
-         channel.sendBlocking(message);         
+      {
+         channel.sendBlocking(message);
       }
       else
       {
@@ -240,7 +241,7 @@
       if (headerSize >= minLargeMessageSize)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "Header size (" + headerSize +
-                                                                        ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
+                                                                    ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
       }
 
       // msg.getBody() could be Null on LargeServerMessage
@@ -256,45 +257,67 @@
 
       channel.send(initialChunk);
 
-      if (msg.getBodyInputStream() != null)
+      InputStream input = msg.getBodyInputStream();
+      
+      if (input != null)
       {
          boolean lastChunk = false;
-         InputStream input = msg.getBodyInputStream();
+
          while (!lastChunk)
          {
-            byte[] bytesRead = new byte[minLargeMessageSize];
-            int numberOfBytesRead;
-
-            try
-            {
-               numberOfBytesRead = input.read(bytesRead);
+            byte[] buff = new byte[minLargeMessageSize];
+            
+            int pos = 0;
+                                   
+            do
+            {               
+               int numberOfBytesRead;
+               
+               int wanted = minLargeMessageSize - pos;
+               
+               try
+               {
+                  numberOfBytesRead = input.read(buff, pos, wanted);
+               }
+               catch (IOException e)
+               {
+                  throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
+                                             "Error reading the LargeMessageBody",
+                                             e);
+               }
+               
+               if (numberOfBytesRead == -1)
+               {                  
+                  lastChunk = true;
+                  
+                  break;
+               }
+                                             
+               pos += numberOfBytesRead;
             }
-            catch (IOException e)
+            while (pos < minLargeMessageSize);
+            
+            if (lastChunk)
             {
-               throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
-                                            "Error reading the LargeMessageBody",
-                                            e);
+               byte[] buff2 = new byte[pos];
+               
+               System.arraycopy(buff, 0, buff2, 0, pos);
+               
+               buff = buff2;
             }
-
-            if (numberOfBytesRead < 0)
-            {
-               numberOfBytesRead = 0;
-               lastChunk = true;
-            }
-
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bytesRead,
-                                                                                            numberOfBytesRead,
+            
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(buff,                                                                                           
                                                                                             !lastChunk,
                                                                                             lastChunk && sendBlocking);
 
             if (sendBlocking && lastChunk)
             {
-               // When sending it blocking, only the last chunk will be blocking.               
+               // When sending it blocking, only the last chunk will be blocking.
                channel.sendBlocking(chunk);
             }
             else
             {
-               channel.send(chunk);               
+               channel.send(chunk);
             }
          }
 
@@ -305,8 +328,8 @@
          catch (IOException e)
          {
             throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
-                                         "Error closing stream from LargeMessageBody",
-                                         e);
+                                       "Error closing stream from LargeMessageBody",
+                                       e);
          }
       }
       else
@@ -327,18 +350,17 @@
 
             lastChunk = pos >= bodySize;
 
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
-                                                                                            chunkLength,
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),                                                                                            
                                                                                             !lastChunk,
                                                                                             lastChunk && sendBlocking);
 
             if (sendBlocking && lastChunk)
             {
-               // When sending it blocking, only the last chunk will be blocking.              
+               // When sending it blocking, only the last chunk will be blocking.
                channel.sendBlocking(chunk);
             }
             else
-            {               
+            {
                channel.send(chunk);
             }
          }

Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-08-21 17:16:34 UTC (rev 7877)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-08-21 20:41:47 UTC (rev 7878)
@@ -534,8 +534,6 @@
          boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0)
                                                 && (failoverOnServerShutdown || !serverShutdown);
          
-         log.info("Attempting failover or reconnect " + attemptFailoverOrReconnect);
-
          if (attemptFailoverOrReconnect)
          {
             lockAllChannel1s();
@@ -640,8 +638,6 @@
          }
          else
          {
-            log.info("Just closing connections and calling failure listeners");
-            
             closeConnectionsAndCallFailureListeners(me);
          }
       }
@@ -1096,7 +1092,6 @@
 
          if (type == PacketImpl.DISCONNECT)
          {
-            log.info("Got a disconnect message");
             threadPool.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	2009-08-21 17:16:34 UTC (rev 7877)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	2009-08-21 20:41:47 UTC (rev 7878)
@@ -35,9 +35,6 @@
 
    private boolean requiresResponse;
 
-   // Not sent through the wire. Just to define how many bytes to send of body
-   private transient int bodyLength;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -56,14 +53,12 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body,
-                                         final int bodyLength,
+   public SessionSendContinuationMessage(final byte[] body,                                         
                                          final boolean continues,
                                          final boolean requiresResponse)
    {
       super(SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
-      this.bodyLength = bodyLength;
    }
 
 
@@ -80,25 +75,20 @@
    @Override
    public int getRequiredBufferSize()
    {
-      return SESSION_CONTINUATION_BASE_SIZE + bodyLength + DataConstants.SIZE_BOOLEAN;
+      return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
    }
 
    @Override
    public void encodeBody(final HornetQBuffer buffer)
    {
-      buffer.writeInt(bodyLength);
-      buffer.writeBytes(body, 0, bodyLength);
-      buffer.writeBoolean(continues);
+      super.encodeBody(buffer);
       buffer.writeBoolean(requiresResponse);
    }
 
    @Override
    public void decodeBody(final HornetQBuffer buffer)
    {
-      bodyLength = buffer.readInt();
-      body = new byte[bodyLength];
-      buffer.readBytes(body);
-      continues = buffer.readBoolean();
+      super.decodeBody(buffer);
       requiresResponse = buffer.readBoolean();
    }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-08-21 17:16:34 UTC (rev 7877)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-08-21 20:41:47 UTC (rev 7878)
@@ -27,6 +27,7 @@
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
@@ -69,15 +70,17 @@
 
    public void testDLALargeMessage() throws Exception
    {
-      final int messageSize = 50000;
+      final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      ClientSession session = null;
+      ClientSession session = null;           
 
       try
       {
          server = createServer(true);
 
          server.start();
+         
+         log.info("*********** starting test");
 
          ClientSessionFactory sf = createInVMFactory();
 
@@ -101,6 +104,8 @@
 
          Message clientFile = createLargeClientMessage(session, messageSize, true);
 
+         log.info("*********** sending large message");
+         
          producer.send(clientFile);
 
          session.commit();
@@ -196,7 +201,7 @@
 
    public void testDLAOnExpiry() throws Exception
    {
-      final int messageSize = 50000;
+      final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
       ClientSession session = null;
 
@@ -332,7 +337,7 @@
 
    public void testExpiryLargeMessage() throws Exception
    {
-      final int messageSize = 50000;
+      final int messageSize = (int)(3 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
       ClientSession session = null;
 



More information about the hornetq-commits mailing list