[hornetq-commits] JBoss hornetq SVN: r10057 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 17 17:30:29 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-17 17:30:28 -0500 (Fri, 17 Dec 2010)
New Revision: 10057

Modified:
   trunk/src/main/org/hornetq/api/core/Message.java
   trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
   trunk/src/main/org/hornetq/utils/DeflaterReader.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
Log:
JBPAPP-5595 - Large Message recreating buffer issue over JMS and getting message size without recreate buffer

Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/api/core/Message.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -67,6 +67,8 @@
    public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
    
    public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
+   
+   public static final SimpleString HDR_LARGE_BODY_SIZE = new SimpleString("_HQ_LARGE_SIZE");
 
    public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -18,6 +18,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
 import org.hornetq.utils.DataConstants;
 
@@ -38,7 +39,7 @@
 
    // Used only when receiving large messages
    private LargeMessageController largeMessageController;
-   
+
    private long largeMessageSize;
 
    // Static --------------------------------------------------------
@@ -93,7 +94,7 @@
    {
       largeMessageController = controller;
    }
-   
+
    public HornetQBuffer getBodyBuffer()
    {
       checkBuffer();
@@ -101,15 +102,11 @@
       return bodyBuffer;
    }
 
-   
    public int getBodySize()
    {
-      checkBuffer();
-      return buffer.writerIndex() - buffer.readerIndex();
+      return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
    }
 
-
-
    public LargeMessageController getLargeMessageController()
    {
       return largeMessageController;
@@ -160,7 +157,7 @@
          return largeMessageController.waitCompletion(timeMilliseconds);
       }
    }
-   
+
    public void discardBody()
    {
       if (bodyBuffer != null)
@@ -173,27 +170,26 @@
       }
    }
 
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-   
+
    private void checkBuffer()
    {
       if (bodyBuffer == null)
       {
-         
+
          long bodySize = this.largeMessageSize + BODY_OFFSET;
          if (bodySize > Integer.MAX_VALUE)
          {
             bodySize = Integer.MAX_VALUE;
          }
          createBody((int)bodySize);
-         
+
          bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
-         
+
          try
          {
             largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
@@ -204,14 +200,13 @@
          }
       }
    }
-   
 
    // Inner classes -------------------------------------------------
-   
+
    protected class HornetQOutputStream extends OutputStream
    {
       HornetQBuffer bufferOut;
-      
+
       HornetQOutputStream(HornetQBuffer out)
       {
          this.bufferOut = out;
@@ -225,7 +220,7 @@
       {
          bufferOut.writeByte((byte)(b & 0xff));
       }
-      
+
    }
 
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -15,6 +15,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -361,13 +362,11 @@
 
       InputStream input = msgI.getBodyInputStream();
 
-      
       if (msgI.isServerMessage())
       {
          largeMessageSendServer(sendBlocking, msgI, credits);
       }
-      else
-      if (input != null)
+      else if (input != null)
       {
          largeMessageSendStreamed(sendBlocking, msgI, input, credits);
       }
@@ -376,7 +375,7 @@
          largeMessageSendBuffered(sendBlocking, msgI, credits);
       }
    }
-   
+
    /**
     * Used to send serverMessages through the bridges.
     * No need to validate compression here since the message is only compressed at the client
@@ -385,8 +384,8 @@
     * @throws HornetQException
     */
    private void largeMessageSendServer(final boolean sendBlocking,
-                                         final MessageInternal msgI,
-                                         final ClientProducerCredits credits) throws HornetQException
+                                       final MessageInternal msgI,
+                                       final ClientProducerCredits credits) throws HornetQException
    {
       BodyEncoder context = msgI.getBodyEncoder();
 
@@ -440,8 +439,6 @@
       }
    }
 
-   
-
    /**
     * @param sendBlocking
     * @param msgI
@@ -469,11 +466,17 @@
 
       InputStream input = inputStreamParameter;
 
+      // We won't know the real size of the message since we are compressing while reading the streaming.
+      // This counter will be passed to the deflater to be updated for every byte read
+      AtomicLong messageSize = new AtomicLong();
+
       if (session.isCompressLargeMessages())
       {
-         input = new DeflaterReader(inputStreamParameter);
+         input = new DeflaterReader(inputStreamParameter, messageSize);
       }
 
+      int totalSize = 0;
+
       while (!lastPacket)
       {
          byte[] buff = new byte[minLargeMessageSize];
@@ -508,19 +511,31 @@
          }
          while (pos < minLargeMessageSize);
 
+         totalSize += pos;
+
+         final SessionSendContinuationMessage chunk;
+
          if (lastPacket)
          {
+
+            if (!session.isCompressLargeMessages())
+            {
+               messageSize.set(totalSize);
+            }
+
             byte[] buff2 = new byte[pos];
 
             System.arraycopy(buff, 0, buff2, 0, pos);
 
             buff = buff2;
+
+            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
          }
+         else
+         {
+            chunk = new SessionSendContinuationMessage(buff, true, false);
+         }
 
-         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(buff,
-                                                                                         !lastPacket,
-                                                                                         lastPacket && sendBlocking);
-
          if (sendBlocking && lastPacket)
          {
             // When sending it blocking, only the last chunk will be blocking.
@@ -551,6 +566,5 @@
                                     e);
       }
    }
-
    // Inner Classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -458,7 +458,7 @@
                {
                   SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
                   requiresResponse = message.isRequiresResponse();
-                  session.sendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
+                  session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
                   if (requiresResponse)
                   {
                      response = new NullResponseMessage();

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -39,6 +39,7 @@
    protected byte[] body;
 
    protected boolean continues;
+   
 
    // Static --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -33,7 +33,12 @@
    // Attributes ----------------------------------------------------
 
    private boolean requiresResponse;
-
+   
+   /**
+    * to be sent on the last package
+    */
+   private long messageBodySize = -1;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -54,6 +59,17 @@
       this.requiresResponse = requiresResponse;
    }
 
+   /**
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+   {
+      this(body, continues, requiresResponse);
+      this.messageBodySize = messageBodySize;
+   }
+
    // Public --------------------------------------------------------
 
    /**
@@ -63,11 +79,20 @@
    {
       return requiresResponse;
    }
+   
+   public long getMessageBodySize()
+   {
+      return messageBodySize;
+   }
 
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       super.encodeRest(buffer);
+      if (!continues)
+      {
+         buffer.writeLong(messageBodySize);
+      }
       buffer.writeBoolean(requiresResponse);
    }
 
@@ -75,6 +100,10 @@
    public void decodeRest(final HornetQBuffer buffer)
    {
       super.decodeRest(buffer);
+      if (!continues)
+      {
+         messageBodySize = buffer.readLong();
+      }
       requiresResponse = buffer.readBoolean();
    }
 

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -99,7 +99,7 @@
 
    void receiveConsumerCredits(long consumerID, int credits) throws Exception;
 
-   void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
+   void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
 
    void send(ServerMessage message, boolean direct) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -29,6 +29,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -1022,7 +1023,7 @@
       }
    }
 
-   public void sendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
+   public void sendContinuations(final int packetSize, final long messageBodySize, final byte[] body, final boolean continues) throws Exception
    {
       if (currentLargeMessage == null)
       {
@@ -1037,6 +1038,8 @@
       if (!continues)
       {
          currentLargeMessage.releaseResources();
+         
+         currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
 
          doSend(currentLargeMessage, false);
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -407,8 +407,6 @@
    public void doBeforeReceive() throws Exception
    {
       bodyLength = message.getBodySize();
-
-      super.doBeforeReceive();
    }
 
    // HornetQRAMessage overrides ----------------------------------------

Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -15,6 +15,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Deflater;
 
 /**
@@ -27,24 +28,28 @@
  */
 public class DeflaterReader extends InputStream
 {
-   private Deflater deflater = new Deflater();
+   private final Deflater deflater = new Deflater();
    private boolean isFinished = false;
    private boolean compressDone = false;
 
    private InputStream input;
    
-   public DeflaterReader(InputStream inData)
+   private final AtomicLong bytesRead;
+   
+   public DeflaterReader(final InputStream inData, final AtomicLong bytesRead)
    {
       input = inData;
+      this.bytesRead = bytesRead;
    }
 
+   @Override
    public int read() throws IOException
    {
       byte[] buffer = new byte[1];
       int n = read(buffer, 0, 1);
       if (n == 1)
       {
-         return (int)buffer[0] & 0xFF;
+         return buffer[0] & 0xFF;
       }
       if (n == -1 || n == 0)
       {
@@ -62,7 +67,7 @@
     * @throws IOException 
     */
    @Override
-   public int read(byte[] buffer, int offset, int len) throws IOException
+   public int read(final byte[] buffer, int offset, int len) throws IOException
    {
       if (compressDone)
       {
@@ -98,6 +103,7 @@
                }
                else
                {
+                  bytesRead.addAndGet(m);
                   deflater.setInput(readBuffer, 0, m);
                }
             }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -940,7 +940,7 @@
          server.start();
 
          
-         locator.setMinLargeMessageSize(111);
+         locator.setMinLargeMessageSize(200);
          
          locator.setCacheLargeMessagesClient(true);
 

Modified: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	2010-12-17 22:30:28 UTC (rev 10057)
@@ -16,6 +16,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Deflater;
 
 import org.hornetq.tests.util.UnitTestCase;
@@ -39,7 +40,8 @@
 
       ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
       
-      DeflaterReader reader = new DeflaterReader(inputStream);
+      AtomicLong counter = new AtomicLong(0);
+      DeflaterReader reader = new DeflaterReader(inputStream, counter);
 
       ArrayList<Integer> zipHolder = new ArrayList<Integer>();
       int b = reader.read();
@@ -50,6 +52,8 @@
          b = reader.read();
       }
       
+      assertEquals(input.length, counter.get());
+      
       byte[] allCompressed = new byte[zipHolder.size()];
       for (int i = 0; i < allCompressed.length; i++)
       {
@@ -71,8 +75,9 @@
       byte[] input = inputString.getBytes("UTF-8");
 
       ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
-      
-      DeflaterReader reader = new DeflaterReader(inputStream);
+      AtomicLong counter = new AtomicLong(0);
+    
+      DeflaterReader reader = new DeflaterReader(inputStream, counter);
 
       byte[] buffer = new byte[7];
       ArrayList<Integer> zipHolder = new ArrayList<Integer>();
@@ -87,6 +92,8 @@
          n = reader.read(buffer);
       }
       
+      assertEquals(input.length, counter.get());
+      
       byte[] allCompressed = new byte[zipHolder.size()];
       for (int i = 0; i < allCompressed.length; i++)
       {



More information about the hornetq-commits mailing list