[hornetq-commits] JBoss hornetq SVN: r10017 - in trunk: src/main/org/hornetq/core/message/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 8 12:21:30 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-08 12:21:29 -0500 (Wed, 08 Dec 2010)
New Revision: 10017

Added:
   trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
   trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
HORNETQ-538 - Large Message will reconstruct the buffer on client

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -166,7 +166,7 @@
       if (largeMessageReceived != null)
       {
          // Check if there are pending packets to be received
-         largeMessageReceived.discardLargeBody();
+         largeMessageReceived.discardBody();
          largeMessageReceived = null;
       }
 
@@ -280,7 +280,7 @@
 
                if (expired)
                {
-                  m.discardLargeBody();
+                  m.discardBody();
 
                   session.expire(id, m.getMessageID());
 
@@ -538,12 +538,10 @@
 
       flowControl(packet.getPacketSize(), false);
 
-      ClientMessageInternal currentChunkMessage = (ClientMessageInternal)packet.getLargeMessage();
+      ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
 
       currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
 
-      currentChunkMessage.setLargeMessage(true);
-
       File largeMessageCache = null;
 
       if (session.isCacheLargeMessageClient())
@@ -557,11 +555,11 @@
 
       if (currentChunkMessage.isCompressed())
       {
-         currentChunkMessage.setBuffer(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
+         currentChunkMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
       }
       else
       {
-         currentChunkMessage.setBuffer(currentLargeMessageController);
+         currentChunkMessage.setLargeMessageController(currentLargeMessageController);
       }
 
       currentChunkMessage.setFlowControlSize(0);
@@ -852,7 +850,7 @@
 
                if (message.isLargeMessage())
                {
-                  message.discardLargeBody();
+                  message.discardBody();
                }
             }
             else

Added: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * ClientLargeMessageImpl is only created when receiving large messages. At the time of sending a regular Message is sent as we won't know the message is considered large 
+ * until the buffer is filled up or the user set a streaming.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Used only when receiving large messages
+   private LargeMessageController largeMessageController;
+   
+   private int largeMessageSize;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * @return the largeMessageSize
+    */
+   public int getLargeMessageSize()
+   {
+      return largeMessageSize;
+   }
+
+   /**
+    * @param largeMessageSize the largeMessageSize to set
+    */
+   public void setLargeMessageSize(int largeMessageSize)
+   {
+      this.largeMessageSize = largeMessageSize;
+   }
+
+   // we only need this constructor as this is only used at decoding large messages on client
+   public ClientLargeMessageImpl()
+   {
+      super();
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getEncodeSize()
+   {
+      if (bodyBuffer != null)
+      {
+         return super.getEncodeSize();
+      }
+      else
+      {
+         return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
+      }
+   }
+
+   /**
+    * @return the largeMessage
+    */
+   public boolean isLargeMessage()
+   {
+      return true;
+   }
+
+   public void setLargeMessageController(final LargeMessageController controller)
+   {
+      largeMessageController = controller;
+   }
+   
+   public HornetQBuffer getBodyBuffer()
+   {
+      checkBuffer();
+
+      return bodyBuffer;
+   }
+
+   
+   public int getBodySize()
+   {
+      checkBuffer();
+      return buffer.writerIndex() - buffer.readerIndex();
+   }
+
+
+
+   public LargeMessageController getLargeMessageController()
+   {
+      return largeMessageController;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
+    */
+   public void saveToOutputStream(final OutputStream out) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         // The body was rebuilt on the client, so we need to behave as a regular message on this case
+         super.saveToOutputStream(out);
+      }
+      else
+      {
+         largeMessageController.saveBuffer(out);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ClientMessage#setOutputStream(java.io.OutputStream)
+    */
+   public void setOutputStream(final OutputStream out) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         super.setOutputStream(out);
+      }
+      else
+      {
+         largeMessageController.setOutputStream(out);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ClientMessage#waitOutputStreamCompletion()
+    */
+   public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         return super.waitOutputStreamCompletion(timeMilliseconds);
+      }
+      else
+      {
+         return largeMessageController.waitCompletion(timeMilliseconds);
+      }
+   }
+   
+   public void discardBody()
+   {
+      if (bodyBuffer != null)
+      {
+         super.discardBody();
+      }
+      else
+      {
+         largeMessageController.discardUnusedPackets();
+      }
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   
+   private void checkBuffer()
+   {
+      if (bodyBuffer == null)
+      {
+         createBody(this.largeMessageSize + BODY_OFFSET);
+         
+         bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
+         
+         try
+         {
+            largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
+         }
+         catch (HornetQException e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+   }
+   
+
+   // Inner classes -------------------------------------------------
+   
+   protected class HornetQOutputStream extends OutputStream
+   {
+      HornetQBuffer bufferOut;
+      
+      HornetQOutputStream(HornetQBuffer out)
+      {
+         this.bufferOut = out;
+      }
+
+      /* (non-Javadoc)
+       * @see java.io.OutputStream#write(int)
+       */
+      @Override
+      public void write(int b) throws IOException
+      {
+         bufferOut.writeByte((byte)(b & Byte.MAX_VALUE));
+      }
+      
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+
+/**
+ * A ClientLargeMessageInternal
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface ClientLargeMessageInternal extends ClientMessageInternal
+{
+
+   void setLargeMessageController(LargeMessageController controller);
+
+   LargeMessageController getLargeMessageController();
+   
+   void setLargeMessageSize(int size);
+   
+   int getLargeMessageSize();
+}

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -46,11 +46,6 @@
 
    private ClientConsumerInternal consumer;
 
-   private boolean largeMessage;
-   
-   // Used only when receiving large messages
-   private LargeMessageController largeMessageController;
-
    private int flowControlSize = -1;
 
    /** Used on LargeMessages only */
@@ -80,7 +75,7 @@
    {
       return false;
    }
-   
+
    public void onReceipt(final ClientConsumerInternal consumer)
    {
       this.consumer = consumer;
@@ -123,22 +118,14 @@
     */
    public boolean isLargeMessage()
    {
-      return largeMessage;
+      return false;
    }
-   
+
    public boolean isCompressed()
    {
       return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
    }
 
-   /**
-    * @param largeMessage the largeMessage to set
-    */
-   public void setLargeMessage(final boolean largeMessage)
-   {
-      this.largeMessage = largeMessage;
-   }
-
    public int getBodySize()
    {
       return buffer.writerIndex() - buffer.readerIndex();
@@ -147,12 +134,7 @@
    @Override
    public String toString()
    {
-      return "ClientMessage[messageID=" + messageID +
-             ", durable=" +
-             durable +
-             ", address=" +
-             getAddress() +
-             "]";
+      return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
    }
 
    /* (non-Javadoc)
@@ -160,24 +142,16 @@
     */
    public void saveToOutputStream(final OutputStream out) throws HornetQException
    {
-      if (largeMessage)
+      try
       {
-          ((LargeMessageController)getWholeBuffer()).saveBuffer(out);
+         byte readBuffer[] = new byte[getBodySize()];
+         getBodyBuffer().readBytes(readBuffer);
+         out.write(readBuffer);
       }
-      else
+      catch (IOException e)
       {
-         try
-         {
-            byte readBuffer[] = new byte[getBodySize()];
-            getBodyBuffer().readBytes(readBuffer);
-            out.write(readBuffer);
-         }
-         catch (IOException e)
-         {
-            throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, "Error saving the message body", e);
-         }
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, "Error saving the message body", e);
       }
-
    }
 
    /* (non-Javadoc)
@@ -185,15 +159,7 @@
     */
    public void setOutputStream(final OutputStream out) throws HornetQException
    {
-      if (largeMessage)
-      {
-         ((LargeMessageController)getWholeBuffer()).setOutputStream(out);
-      }
-      else
-      {
-         saveToOutputStream(out);
-      }
-
+      saveToOutputStream(out);
    }
 
    /* (non-Javadoc)
@@ -201,25 +167,14 @@
     */
    public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
    {
-      if (largeMessage)
-      {
-         return ((LargeMessageController)getWholeBuffer()).waitCompletion(timeMilliseconds);
-      }
-      else
-      {
-         return true;
-      }
+      return true;
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.api.core.client.impl.ClientMessageInternal#discardLargeBody()
     */
-   public void discardLargeBody()
+   public void discardBody()
    {
-      if (largeMessage)
-      {
-         ((LargeMessageController)getWholeBuffer()).discardUnusedPackets();
-      }
    }
 
    /**

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -36,12 +36,10 @@
 
    void onReceipt(ClientConsumerInternal consumer);
 
-   void setLargeMessage(boolean largeMessage);
-
    /**
     * Discard unused packets (used on large-message)
     */
-   void discardLargeBody();
+   void discardBody();
 
    void setBuffer(HornetQBuffer buffer);
    

Modified: trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -423,8 +423,7 @@
 
    public long getSize()
    {
-      // TODO
-      return 0;
+      return this.bufferDelegate.getSize();
    }
 
    public void writerIndex(final int writerIndex)

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -262,14 +262,7 @@
    {
       if (bodyBuffer == null)
       {
-         if (buffer instanceof LargeMessageController == false)
-         {
-            bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
-         }
-         else
-         {
-            return buffer;
-         }
+         bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
       }
 
       return bodyBuffer;
@@ -938,7 +931,7 @@
       bufferValid = true;
    }
 
-   private void createBody(final int initialMessageBufferSize)
+   protected void createBody(final int initialMessageBufferSize)
    {
       buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -14,7 +14,7 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.client.impl.ClientLargeMessageImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -43,7 +43,7 @@
    public SessionReceiveLargeMessage()
    {
       super(PacketImpl.SESS_RECEIVE_LARGE_MSG);
-      this.message = new ClientMessageImpl();
+      this.message = new ClientLargeMessageImpl();
    }
 
    public SessionReceiveLargeMessage(final long consumerID,

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -277,32 +277,6 @@
    }
 
 
-   // TODO: Fix these tests on LargeMessageCompression
-
-   public void testResendSmallStreamMessage() throws Exception
-   {
-   }
-
-   public void testResendLargeStreamMessage() throws Exception
-   {
-   }
-
-   public void testResendCachedSmallStreamMessage() throws Exception
-   {
-   }
-
-   public void testResendCachedLargeStreamMessage() throws Exception
-   {
-   }
-
-   public void testSimpleRollback() throws Exception
-   {
-   }
-
-   public void testSimpleRollbackXA() throws Exception
-   {
-   }
-
    public void testSendServerMessage() throws Exception
    {
       // doesn't make sense as compressed

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -841,6 +841,7 @@
 
    public void internalTestResendMessage(final long messageSize) throws Exception
    {
+      clearData();
       ClientSession session = null;
 
       try
@@ -879,19 +880,7 @@
 
          producer2.send(msg1);
 
-         boolean failed = false;
 
-         try
-         {
-            producer2.send(msg1);
-         }
-         catch (Throwable e)
-         {
-            failed = true;
-         }
-
-         Assert.assertTrue("Exception expected", failed);
-
          session.commit();
 
          ClientMessage msg2 = consumer2.receive(10000);
@@ -2206,7 +2195,7 @@
 
                Assert.assertNotNull(clientMessage);
 
-               Assert.assertEquals(numberOfBytes, clientMessage.getBodyBuffer().writerIndex());
+               Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
 
                clientMessage.acknowledge();
 

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2010-12-08 17:21:29 UTC (rev 10017)
@@ -432,9 +432,6 @@
                                          ((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
                   }
 
-                  HornetQBuffer buffer = message.getBodyBuffer();
-                  buffer.resetReaderIndex();
-
                   if (useStreamOnConsume)
                   {
                      final AtomicLong bytesRead = new AtomicLong(0);
@@ -477,6 +474,9 @@
                   }
                   else
                   {
+                     HornetQBuffer buffer = message.getBodyBuffer();
+                     buffer.resetReaderIndex();
+
                      for (long b = 0; b < numberOfBytes; b++)
                      {
                         if (b % (1024l * 1024l) == 0l)



More information about the hornetq-commits mailing list