[hornetq-commits] JBoss hornetq SVN: r10017 - in trunk: src/main/org/hornetq/core/message/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Dec 8 12:21:30 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-08 12:21:29 -0500 (Wed, 08 Dec 2010)
New Revision: 10017
Added:
trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
HORNETQ-538 - Large Message will reconstruct the buffer on client
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -166,7 +166,7 @@
if (largeMessageReceived != null)
{
// Check if there are pending packets to be received
- largeMessageReceived.discardLargeBody();
+ largeMessageReceived.discardBody();
largeMessageReceived = null;
}
@@ -280,7 +280,7 @@
if (expired)
{
- m.discardLargeBody();
+ m.discardBody();
session.expire(id, m.getMessageID());
@@ -538,12 +538,10 @@
flowControl(packet.getPacketSize(), false);
- ClientMessageInternal currentChunkMessage = (ClientMessageInternal)packet.getLargeMessage();
+ ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
- currentChunkMessage.setLargeMessage(true);
-
File largeMessageCache = null;
if (session.isCacheLargeMessageClient())
@@ -557,11 +555,11 @@
if (currentChunkMessage.isCompressed())
{
- currentChunkMessage.setBuffer(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
+ currentChunkMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
}
else
{
- currentChunkMessage.setBuffer(currentLargeMessageController);
+ currentChunkMessage.setLargeMessageController(currentLargeMessageController);
}
currentChunkMessage.setFlowControlSize(0);
@@ -852,7 +850,7 @@
if (message.isLargeMessage())
{
- message.discardLargeBody();
+ message.discardBody();
}
}
else
Added: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * ClientLargeMessageImpl is only created when receiving large messages. At the time of sending a regular Message is sent as we won't know the message is considered large
+ * until the buffer is filled up or the user set a streaming.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Used only when receiving large messages
+ private LargeMessageController largeMessageController;
+
+ private int largeMessageSize;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * @return the largeMessageSize
+ */
+ public int getLargeMessageSize()
+ {
+ return largeMessageSize;
+ }
+
+ /**
+ * @param largeMessageSize the largeMessageSize to set
+ */
+ public void setLargeMessageSize(int largeMessageSize)
+ {
+ this.largeMessageSize = largeMessageSize;
+ }
+
+ // we only need this constructor as this is only used at decoding large messages on client
+ public ClientLargeMessageImpl()
+ {
+ super();
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getEncodeSize()
+ {
+ if (bodyBuffer != null)
+ {
+ return super.getEncodeSize();
+ }
+ else
+ {
+ return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
+ }
+ }
+
+ /**
+ * @return the largeMessage
+ */
+ public boolean isLargeMessage()
+ {
+ return true;
+ }
+
+ public void setLargeMessageController(final LargeMessageController controller)
+ {
+ largeMessageController = controller;
+ }
+
+ public HornetQBuffer getBodyBuffer()
+ {
+ checkBuffer();
+
+ return bodyBuffer;
+ }
+
+
+ public int getBodySize()
+ {
+ checkBuffer();
+ return buffer.writerIndex() - buffer.readerIndex();
+ }
+
+
+
+ public LargeMessageController getLargeMessageController()
+ {
+ return largeMessageController;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
+ */
+ public void saveToOutputStream(final OutputStream out) throws HornetQException
+ {
+ if (bodyBuffer != null)
+ {
+ // The body was rebuilt on the client, so we need to behave as a regular message on this case
+ super.saveToOutputStream(out);
+ }
+ else
+ {
+ largeMessageController.saveBuffer(out);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientMessage#setOutputStream(java.io.OutputStream)
+ */
+ public void setOutputStream(final OutputStream out) throws HornetQException
+ {
+ if (bodyBuffer != null)
+ {
+ super.setOutputStream(out);
+ }
+ else
+ {
+ largeMessageController.setOutputStream(out);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientMessage#waitOutputStreamCompletion()
+ */
+ public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
+ {
+ if (bodyBuffer != null)
+ {
+ return super.waitOutputStreamCompletion(timeMilliseconds);
+ }
+ else
+ {
+ return largeMessageController.waitCompletion(timeMilliseconds);
+ }
+ }
+
+ public void discardBody()
+ {
+ if (bodyBuffer != null)
+ {
+ super.discardBody();
+ }
+ else
+ {
+ largeMessageController.discardUnusedPackets();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void checkBuffer()
+ {
+ if (bodyBuffer == null)
+ {
+ createBody(this.largeMessageSize + BODY_OFFSET);
+
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
+
+ try
+ {
+ largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
+ }
+ catch (HornetQException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+ protected class HornetQOutputStream extends OutputStream
+ {
+ HornetQBuffer bufferOut;
+
+ HornetQOutputStream(HornetQBuffer out)
+ {
+ this.bufferOut = out;
+ }
+
+ /* (non-Javadoc)
+ * @see java.io.OutputStream#write(int)
+ */
+ @Override
+ public void write(int b) throws IOException
+ {
+ bufferOut.writeByte((byte)(b & Byte.MAX_VALUE));
+ }
+
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageInternal.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+
+/**
+ * A ClientLargeMessageInternal
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface ClientLargeMessageInternal extends ClientMessageInternal
+{
+
+ void setLargeMessageController(LargeMessageController controller);
+
+ LargeMessageController getLargeMessageController();
+
+ void setLargeMessageSize(int size);
+
+ int getLargeMessageSize();
+}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -46,11 +46,6 @@
private ClientConsumerInternal consumer;
- private boolean largeMessage;
-
- // Used only when receiving large messages
- private LargeMessageController largeMessageController;
-
private int flowControlSize = -1;
/** Used on LargeMessages only */
@@ -80,7 +75,7 @@
{
return false;
}
-
+
public void onReceipt(final ClientConsumerInternal consumer)
{
this.consumer = consumer;
@@ -123,22 +118,14 @@
*/
public boolean isLargeMessage()
{
- return largeMessage;
+ return false;
}
-
+
public boolean isCompressed()
{
return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
}
- /**
- * @param largeMessage the largeMessage to set
- */
- public void setLargeMessage(final boolean largeMessage)
- {
- this.largeMessage = largeMessage;
- }
-
public int getBodySize()
{
return buffer.writerIndex() - buffer.readerIndex();
@@ -147,12 +134,7 @@
@Override
public String toString()
{
- return "ClientMessage[messageID=" + messageID +
- ", durable=" +
- durable +
- ", address=" +
- getAddress() +
- "]";
+ return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
}
/* (non-Javadoc)
@@ -160,24 +142,16 @@
*/
public void saveToOutputStream(final OutputStream out) throws HornetQException
{
- if (largeMessage)
+ try
{
- ((LargeMessageController)getWholeBuffer()).saveBuffer(out);
+ byte readBuffer[] = new byte[getBodySize()];
+ getBodyBuffer().readBytes(readBuffer);
+ out.write(readBuffer);
}
- else
+ catch (IOException e)
{
- try
- {
- byte readBuffer[] = new byte[getBodySize()];
- getBodyBuffer().readBytes(readBuffer);
- out.write(readBuffer);
- }
- catch (IOException e)
- {
- throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, "Error saving the message body", e);
- }
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, "Error saving the message body", e);
}
-
}
/* (non-Javadoc)
@@ -185,15 +159,7 @@
*/
public void setOutputStream(final OutputStream out) throws HornetQException
{
- if (largeMessage)
- {
- ((LargeMessageController)getWholeBuffer()).setOutputStream(out);
- }
- else
- {
- saveToOutputStream(out);
- }
-
+ saveToOutputStream(out);
}
/* (non-Javadoc)
@@ -201,25 +167,14 @@
*/
public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
{
- if (largeMessage)
- {
- return ((LargeMessageController)getWholeBuffer()).waitCompletion(timeMilliseconds);
- }
- else
- {
- return true;
- }
+ return true;
}
/* (non-Javadoc)
* @see org.hornetq.api.core.client.impl.ClientMessageInternal#discardLargeBody()
*/
- public void discardLargeBody()
+ public void discardBody()
{
- if (largeMessage)
- {
- ((LargeMessageController)getWholeBuffer()).discardUnusedPackets();
- }
}
/**
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -36,12 +36,10 @@
void onReceipt(ClientConsumerInternal consumer);
- void setLargeMessage(boolean largeMessage);
-
/**
* Discard unused packets (used on large-message)
*/
- void discardLargeBody();
+ void discardBody();
void setBuffer(HornetQBuffer buffer);
Modified: trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -423,8 +423,7 @@
public long getSize()
{
- // TODO
- return 0;
+ return this.bufferDelegate.getSize();
}
public void writerIndex(final int writerIndex)
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -262,14 +262,7 @@
{
if (bodyBuffer == null)
{
- if (buffer instanceof LargeMessageController == false)
- {
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
- }
- else
- {
- return buffer;
- }
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
}
return bodyBuffer;
@@ -938,7 +931,7 @@
bufferValid = true;
}
- private void createBody(final int initialMessageBufferSize)
+ protected void createBody(final int initialMessageBufferSize)
{
buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -14,7 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.client.impl.ClientLargeMessageImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -43,7 +43,7 @@
public SessionReceiveLargeMessage()
{
super(PacketImpl.SESS_RECEIVE_LARGE_MSG);
- this.message = new ClientMessageImpl();
+ this.message = new ClientLargeMessageImpl();
}
public SessionReceiveLargeMessage(final long consumerID,
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -277,32 +277,6 @@
}
- // TODO: Fix these tests on LargeMessageCompression
-
- public void testResendSmallStreamMessage() throws Exception
- {
- }
-
- public void testResendLargeStreamMessage() throws Exception
- {
- }
-
- public void testResendCachedSmallStreamMessage() throws Exception
- {
- }
-
- public void testResendCachedLargeStreamMessage() throws Exception
- {
- }
-
- public void testSimpleRollback() throws Exception
- {
- }
-
- public void testSimpleRollbackXA() throws Exception
- {
- }
-
public void testSendServerMessage() throws Exception
{
// doesn't make sense as compressed
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -841,6 +841,7 @@
public void internalTestResendMessage(final long messageSize) throws Exception
{
+ clearData();
ClientSession session = null;
try
@@ -879,19 +880,7 @@
producer2.send(msg1);
- boolean failed = false;
- try
- {
- producer2.send(msg1);
- }
- catch (Throwable e)
- {
- failed = true;
- }
-
- Assert.assertTrue("Exception expected", failed);
-
session.commit();
ClientMessage msg2 = consumer2.receive(10000);
@@ -2206,7 +2195,7 @@
Assert.assertNotNull(clientMessage);
- Assert.assertEquals(numberOfBytes, clientMessage.getBodyBuffer().writerIndex());
+ Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
clientMessage.acknowledge();
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-08 15:09:06 UTC (rev 10016)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-08 17:21:29 UTC (rev 10017)
@@ -432,9 +432,6 @@
((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
}
- HornetQBuffer buffer = message.getBodyBuffer();
- buffer.resetReaderIndex();
-
if (useStreamOnConsume)
{
final AtomicLong bytesRead = new AtomicLong(0);
@@ -477,6 +474,9 @@
}
else
{
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+
for (long b = 0; b < numberOfBytes; b++)
{
if (b % (1024l * 1024l) == 0l)
More information about the hornetq-commits
mailing list