[hornetq-commits] JBoss hornetq SVN: r10112 - in trunk: src/main/org/hornetq/core/client/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 10 15:30:22 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-01-10 15:30:22 -0500 (Mon, 10 Jan 2011)
New Revision: 10112

Modified:
   trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/utils/DeflaterReader.java
Log:
JBOAOO-5595 - Fixing large message send on file-size > 32bits integers

Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2011-01-10 20:30:22 UTC (rev 10112)
@@ -49,10 +49,8 @@
    // This may take some considerable time to create, send and consume - if it takes too long or you don't have
    // enough disk space just reduce the file size here
 
-   private final long FILE_SIZE = 256L * 1024 * 1024;
+  private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 10 GiB message
 
-   //private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
-
    @Override
    public boolean runExample() throws Exception
    {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-01-10 20:30:22 UTC (rev 10112)
@@ -468,14 +468,13 @@
 
       // We won't know the real size of the message since we are compressing while reading the streaming.
       // This counter will be passed to the deflater to be updated for every byte read
-      AtomicLong messageSize = new AtomicLong();
 
       if (session.isCompressLargeMessages())
       {
-         input = new DeflaterReader(inputStreamParameter, messageSize);
+         input = new DeflaterReader(inputStreamParameter);
       }
 
-      int totalSize = 0;
+      long totalSize = 0;
 
       while (!lastPacket)
       {
@@ -518,18 +517,13 @@
          if (lastPacket)
          {
 
-            if (!session.isCompressLargeMessages())
-            {
-               messageSize.set(totalSize);
-            }
-
             byte[] buff2 = new byte[pos];
 
             System.arraycopy(buff, 0, buff2, 0, pos);
 
             buff = buff2;
 
-            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, totalSize);
          }
          else
          {

Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java	2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java	2011-01-10 20:30:22 UTC (rev 10112)
@@ -41,6 +41,11 @@
       input = inData;
       this.bytesRead = bytesRead;
    }
+   
+   public DeflaterReader(final InputStream inData)
+   {
+      this(inData, null);
+   }
 
    @Override
    public int read() throws IOException
@@ -103,7 +108,10 @@
                }
                else
                {
-                  bytesRead.addAndGet(m);
+                  if (bytesRead != null)
+                  {
+                     bytesRead.addAndGet(m);
+                  }
                   deflater.setInput(readBuffer, 0, m);
                }
             }



More information about the hornetq-commits mailing list