[hornetq-commits] JBoss hornetq SVN: r8404 - in branches/20-optimisation: src/main/org/hornetq/core/message and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 25 17:20:58 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-25 17:20:57 -0500 (Wed, 25 Nov 2009)
New Revision: 8404

Modified:
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
   branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Fix for LargeMessages

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -16,15 +16,15 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
 import org.hornetq.core.client.LargeMessageBuffer;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -147,17 +147,6 @@
     */
 
    // FIXME - only used for large messages - move it!
-   public long getLargeBodySize()
-   {
-      if (largeMessage)
-      {
-         return ((LargeMessageBuffer)getWholeBuffer()).getSize();
-      }
-      else
-      {
-         return this.getBodySize();
-      }
-   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
@@ -172,7 +161,9 @@
       {
          try
          {
-            out.write(this.getWholeBuffer().toByteBuffer().array());
+            byte readBuffer[] = new byte[getBodySize()];
+            getBodyBuffer().readBytes(readBuffer);
+            out.write(readBuffer);
          }
          catch (IOException e)
          {
@@ -249,6 +240,50 @@
        bodyBuffer.setBuffer(buffer);
     }
  }
+ 
+ public BodyEncoder getBodyEncoder() throws HornetQException
+ {
+    return new DecodingContext();
+ }
 
 
+
+ private final class DecodingContext implements BodyEncoder
+ {
+    private int lastPos = 0;
+
+    public DecodingContext()
+    {
+    }
+
+    public void open()
+    {
+    }
+
+    public void close()
+    {
+    }
+    
+    public long getLargeBodySize()
+    {
+       return buffer.writerIndex();
+    }
+
+    public int encode(final ByteBuffer bufferRead) throws HornetQException
+    {
+       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+       return encode(buffer, bufferRead.capacity());
+    }
+
+    public int encode(final HornetQBuffer bufferOut, final int size)
+    {
+       byte[] bytes = new byte[size];
+       getWholeBuffer().readBytes(bytes);
+       bufferOut.writeBytes(bytes, 0, size);
+       return size;
+    }
+ }
+
+ 
+
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -232,8 +232,6 @@
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
-      SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
-
       session.workDone();
 
       boolean isLarge;
@@ -251,13 +249,18 @@
       {
          largeMessageSend(sendBlocking, msg, theCredits);
       }
-      else if (sendBlocking)
-      {
-         channel.sendBlocking(message);
-      }
       else
       {
-         channel.send(message);
+         SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+
+         if (sendBlocking)
+         {
+            channel.sendBlocking(message);
+         }
+         else
+         {
+            channel.send(message);
+         }
       }
 
       try
@@ -345,10 +348,12 @@
                                          final Message msg,
                                          final ClientProducerCredits credits) throws HornetQException
    {
-      final long bodySize = msg.getLargeBodySize();
-
       BodyEncoder context = msg.getBodyEncoder();
+      
+      final long bodySize = context.getLargeBodySize();
 
+      
+
       context.open();
       try
       {

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -35,4 +35,6 @@
    int encode(ByteBuffer bufferRead) throws HornetQException;
 
    int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+
+   long getLargeBodySize();
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -18,6 +18,7 @@
 import java.util.Set;
 
 import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -198,10 +199,8 @@
    
    void decodeHeadersAndProperties(HornetQBuffer buffer);
    
-   long getLargeBodySize();
+   BodyEncoder getBodyEncoder() throws HornetQException;
    
-   BodyEncoder getBodyEncoder();
-   
    /** Get the InputStream used on a message that will be sent over a producer */
    InputStream getBodyInputStream();
    

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -142,8 +142,12 @@
 
    protected MessageImpl(final long messageID, final int initialMessageBufferSize)
    {
+      this(initialMessageBufferSize);
+   }
+
+   protected MessageImpl(final int initialMessageBufferSize)
+   {
       this();
-      this.messageID = messageID;
       createBody(initialMessageBufferSize);
    }
 
@@ -165,11 +169,15 @@
       endOfBodyPosition = other.endOfBodyPosition;
       endOfMessagePosition = other.endOfMessagePosition;
       copied = other.copied;
-
-      // We need to copy the underlying buffer too, since the different messsages thereafter might have different
-      // properties set on them, making their encoding different
-      buffer = other.buffer.copy(0, other.buffer.capacity());
-      buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+      
+      if (other.buffer != null)
+      {
+         createBody(other.buffer.capacity());
+         // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+         // properties set on them, making their encoding different
+         buffer = other.buffer.copy(0, other.buffer.capacity());
+         buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+      }
    }
 
    // Message implementation ----------------------------------------
@@ -779,7 +787,7 @@
       return buffer;
    }
 
-   public BodyEncoder getBodyEncoder()
+   public BodyEncoder getBodyEncoder() throws HornetQException
    {
       return new DecodingContext();
    }
@@ -889,6 +897,11 @@
       public void close()
       {
       }
+      
+      public long getLargeBodySize()
+      {
+         return buffer.writerIndex();
+      }
 
       public int encode(final ByteBuffer bufferRead) throws HornetQException
       {

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -125,20 +125,6 @@
    }
 
    @Override
-   public synchronized long getLargeBodySize()
-   {
-      try
-      {
-         validateFile();
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e.getMessage(), e);
-      }
-      return bodySize;
-   }
-
-   @Override
    public synchronized int getEncodeSize()
    {
       return getHeadersAndPropertiesEncodeSize();
@@ -146,15 +132,15 @@
 
    @Override
    public void encode(final HornetQBuffer buffer)
-   {      
+   {
       super.encodeHeadersAndProperties(buffer);
    }
-   
+
    @Override
    public void decode(final HornetQBuffer buffer)
-   {      
+   {
       file = null;
-      
+
       super.decodeHeadersAndProperties(buffer);
    }
 
@@ -174,8 +160,9 @@
    }
 
    @Override
-   public BodyEncoder getBodyEncoder()
+   public BodyEncoder getBodyEncoder() throws HornetQException
    {
+      validateFile();
       return new DecodingContext();
    }
 
@@ -312,22 +299,30 @@
 
    // Private -------------------------------------------------------
 
-   private synchronized void validateFile() throws Exception
+   private synchronized void validateFile() throws HornetQException
    {
-      if (file == null)
+      try
       {
-         if (messageID <= 0)
+         if (file == null)
          {
-            throw new RuntimeException("MessageID not set on LargeMessage");
+            if (messageID <= 0)
+            {
+               throw new RuntimeException("MessageID not set on LargeMessage");
+            }
+   
+            file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+   
+            file.open();
+   
+            bodySize = file.size();
+   
          }
-
-         file = storageManager.createFileForLargeMessage(getMessageID(), durable);
-
-         file.open();
-
-         bodySize = file.size();
-
       }
+      catch (Exception e)
+      {
+         // TODO: There is an IO_ERROR on trunk now, this should be used here instead
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+      }
    }
 
    /* (non-Javadoc)
@@ -415,5 +410,13 @@
 
          return bytesRead;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.message.BodyEncoder#getLargeBodySize()
+       */
+      public long getLargeBodySize()
+      {
+         return bodySize;
+      }
    }
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -57,18 +57,13 @@
     */
    public synchronized void addBytes(final byte[] bytes)
    {
-//      HornetQBuffer buffer = getBuffer();
-//
-//      if (buffer != null)
-//      {
-         // expand the buffer
-         buffer.writeBytes(bytes);
-//      }
-//      else
-//      {
-//         // Reuse the initial byte array on the buffer construction
-//         setBuffer(ChannelBuffers.dynamicBuffer(bytes));
-//      }
+      if (buffer == null)
+      {
+         buffer = HornetQBuffers.dynamicBuffer(bytes.length);
+      }
+
+      // expand the buffer
+      buffer.writeBytes(bytes);
    }
 
    /* (non-Javadoc)

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -32,9 +32,6 @@
    /** Used only if largeMessage */
    private byte[] largeMessageHeader;
 
-   /** We need to set the MessageID when replicating this on the server */
-   private long largeMessageId = -1;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -58,28 +55,11 @@
       return largeMessageHeader;
    }
 
-   /**
-    * @return the largeMessageId
-    */
-   public long getLargeMessageID()
-   {
-      return largeMessageId;
-   }
-
-   /**
-    * @param largeMessageId the largeMessageId to set
-    */
-   public void setLargeMessageID(long id)
-   {
-      this.largeMessageId = id;
-   }
-
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeInt(largeMessageHeader.length);
       buffer.writeBytes(largeMessageHeader);
-      buffer.writeLong(largeMessageId);
    }
 
    @Override
@@ -90,8 +70,6 @@
       largeMessageHeader = new byte[largeMessageLength];
 
       buffer.readBytes(largeMessageHeader);
-
-      largeMessageId = buffer.readLong();
    }
 
    // Package protected ---------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.server;
 
-import org.hornetq.core.message.BodyEncoder;
 
 /**
  * A LargeMessage
@@ -36,8 +35,6 @@
    /** Close the files if opened */
    void releaseResources();
    
-   long getLargeBodySize();
-   
    void deleteFile() throws Exception;
    
    void incrementDelayDeletionCount();

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -638,7 +638,7 @@
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
    private final class LargeMessageDeliverer
    {
-      private final long sizePendingLargeMessage;
+      private long sizePendingLargeMessage;
 
       private LargeServerMessage largeMessage;
 
@@ -657,8 +657,6 @@
 
          largeMessage.incrementDelayDeletionCount();
 
-         sizePendingLargeMessage = largeMessage.getLargeBodySize();
-
          this.ref = ref;
       }
 
@@ -684,14 +682,16 @@
 
                largeMessage.encodeHeadersAndProperties(headerBuffer);
 
+               context = largeMessage.getBodyEncoder();
+
+               sizePendingLargeMessage = context.getLargeBodySize();
+
                SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
                                                                                          headerBuffer.toByteBuffer()
                                                                                                      .array(),
-                                                                                         largeMessage.getLargeBodySize(),
+                                                                                         context.getLargeBodySize(),
                                                                                          ref.getDeliveryCount());
 
-               context = largeMessage.getBodyEncoder();
-
                context.open();
 
                sentInitialPacket = true;

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -64,6 +64,11 @@
    {
       super(messageID, initialMessageBufferSize);   
    }
+   
+   protected ServerMessageImpl(final int initialMessageBufferSize)
+   {
+      super(initialMessageBufferSize);
+   }
 
    /*
     * Copy constructor
@@ -144,11 +149,6 @@
       return false;
    }
 
-   public long getLargeBodySize()
-   {
-      return -1;
-   }
-
    public int getMemoryEstimate()
    {
       if (memoryEstimate == -1)

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -18,11 +18,6 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import junit.framework.AssertionFailedError;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
@@ -53,13 +48,6 @@
  */
 public class LargeMessageTest extends LargeMessageTestBase
 {
-   public static Test suite()
-   {
-      TestSuite suite = new TestSuite();
-
-      return suite;
-   }
-
    // Constants -----------------------------------------------------
 
    final static int RECEIVE_WAIT_TIME = 60000;
@@ -788,14 +776,18 @@
 
          producer2.send(msg1);
 
+         boolean failed = false;
+         
          try
          {
             producer2.send(msg1);
-            fail("Expected Exception");
          }
          catch (Throwable e)
          {
+            failed = true;
          }
+         
+         assertTrue("Exception expected", failed);
 
          session.commit();
 
@@ -926,7 +918,7 @@
                  false,
                  true,
                  true,
-                 100,
+                 2,
                  LARGE_MESSAGE_SIZE,
                  RECEIVE_WAIT_TIME,
                  0);
@@ -2211,22 +2203,18 @@
 
          ClientMessage message = null;
 
-         HornetQBuffer body = null;
-
          for (int i = 0; i < 100; i++)
          {
             message = session.createClientMessage(true);
             
+            // TODO: Why do I need to reset the writerIndex?
+            message.getBodyBuffer().writerIndex(0);
+            
             for (int j = 1; j <= numberOfBytes; j++)
             {
                message.getBodyBuffer().writeInt(j);
             }
 
-            if (i == 0)
-            {
-               body = message.getBodyBuffer();
-            }
-   
             producer.send(message);
          }
 
@@ -2262,17 +2250,12 @@
 
             assertNotNull(message2);
 
-            try
+            message.getBodyBuffer().readerIndex(0);
+               
+            for (int j = 1; j <= numberOfBytes; j++)
             {
-               assertEqualsByteArrays(body.writerIndex(), body.toByteBuffer().array(), message2.getBodyBuffer().toByteBuffer().
-                                      array());
+               assertEquals(j, message.getBodyBuffer().readInt());
             }
-            catch (AssertionFailedError e)
-            {
-               log.info("Expected buffer:" + dumbBytesHex(body.toByteBuffer().array(), 40));
-               log.info("Arriving buffer:" + dumbBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40));
-               throw e;
-            }
          }
 
          consumer.close();

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -350,7 +350,6 @@
 
                            HornetQBuffer buffer = message.getBodyBuffer();
                            buffer.resetReaderIndex();
-                           assertEquals(numberOfBytes, buffer.writerIndex());
                            for (long b = 0; b < numberOfBytes; b++)
                            {
                               if (b % (1024l * 1024l) == 0)
@@ -360,6 +359,15 @@
 
                               assertEquals(getSamplebyte(b), buffer.readByte());
                            }
+                           
+                           try
+                           {
+                              buffer.readByte();
+                              fail("Supposed to throw an exception");
+                           }
+                           catch (Exception e)
+                           {
+                           }
                         }
                      }
                      catch (Throwable e)
@@ -396,8 +404,6 @@
 
                   assertNotNull(message);
 
-                  log.debug("Message: " + i);
-
                   System.currentTimeMillis();
 
                   if (delayDelivery > 0)

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-25 20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-25 22:20:57 UTC (rev 8404)
@@ -288,7 +288,7 @@
 
          assertEquals(0, manager.getActiveTokens().size());
 
-         ServerMessage msg = new ServerMessageImpl(1, 10);
+         ServerMessage msg = new ServerMessageImpl(1, 1024);
 
          SimpleString dummy = new SimpleString("dummy");
          msg.setDestination(dummy);



More information about the hornetq-commits mailing list