[hornetq-commits] JBoss hornetq SVN: r9575 - in branches/Branch_Large_Message_Compression: src/main/org/hornetq/api/core and 19 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 20 18:30:34 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-20 18:30:33 -0400 (Fri, 20 Aug 2010)
New Revision: 9575
Added:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
Initial implementation
Modified: branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd 2010-08-20 22:30:33 UTC (rev 9575)
@@ -75,7 +75,11 @@
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:long"
maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
+ <xsd:element name="compress-large-messages" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
+
<xsd:element name="client-id" type="xsd:string"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -65,6 +65,8 @@
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+
+ public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -252,6 +252,27 @@
* @param minLargeMessageSize large message size threshold in bytes
*/
void setMinLargeMessageSize(int minLargeMessageSize);
+
+
+ /**
+ * If this attribute is set to true, the message body will be compressed when sent as large message.
+ *
+ * the compression will be done using the GZIP protocol.
+ *
+ *
+ * @param compressLargeMessage
+ */
+ void setCompressLargeMessages(boolean compressLargeMessage);
+
+ /**
+ * If this attribute is set to true, the message body will be compressed when sent as large message.
+ *
+ * the compression will be done using the GZIP protocol.
+ *
+ *
+ * @param compressLargeMessage
+ */
+ boolean isCompressLargeMessages();
/**
* Returns the window size for flow control of the consumers created through this factory.
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -41,6 +41,8 @@
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+
+ public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
import org.hornetq.utils.Future;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -444,6 +445,11 @@
// ClientConsumerInternal implementation
// --------------------------------------------------------------
+ public ClientSessionInternal getSession()
+ {
+ return session;
+ }
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -544,7 +550,14 @@
currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
- currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ if (currentChunkMessage.isCompressed())
+ {
+ currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer, session.getThreadPool()));
+ }
+ else
+ {
+ currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ }
currentChunkMessage.setFlowControlSize(0);
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -67,4 +67,6 @@
void start();
SessionQueueQueryResponseMessage getQueueInfo();
+
+ ClientSessionInternal getSession();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -21,11 +21,13 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
*
@@ -117,6 +119,11 @@
{
return largeMessage;
}
+
+ public boolean isCompressed()
+ {
+ return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+ }
/**
* @param largeMessage the largeMessage to set
@@ -142,7 +149,6 @@
"]";
}
- // FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
@@ -150,7 +156,7 @@
{
if (largeMessage)
{
- ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+ ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
}
else
{
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -44,4 +44,6 @@
void discardLargeBody();
void setBuffer(HornetQBuffer buffer);
+
+ boolean isCompressed();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,8 +13,16 @@
package org.hornetq.core.client.impl;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -28,6 +36,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -150,7 +160,7 @@
{
return;
}
-
+
doCleanup();
}
@@ -190,7 +200,7 @@
{
return credits;
}
-
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
@@ -203,7 +213,7 @@
{
session.returnCredits(address);
}
-
+
session.removeProducer(this);
closed = true;
@@ -212,12 +222,13 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
MessageInternal msgI = (MessageInternal)msg;
-
+
ClientProducerCredits theCredits;
-
+
boolean isLarge;
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
@@ -236,7 +247,7 @@
{
msg.setAddress(address);
}
-
+
// Anonymous
theCredits = session.getCredits(address, true);
}
@@ -250,7 +261,7 @@
{
msg.setAddress(this.address);
}
-
+
theCredits = credits;
}
@@ -270,8 +281,6 @@
session.workDone();
-
-
if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +331,16 @@
* @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
{
+
+ if (session.isCompressLargeMessages())
+ {
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+ }
+
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
@@ -341,7 +358,6 @@
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msgI.encodeHeadersAndProperties(headerBuffer);
-
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
channel.send(initialChunk);
@@ -358,7 +374,7 @@
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input, credits);
+ largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
else
{
@@ -375,72 +391,29 @@
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msgI.getBodyEncoder();
-
- final long bodySize = context.getLargeBodySize();
-
- context.open();
- try
- {
-
- for (int pos = 0; pos < bodySize;)
- {
- final boolean lastChunk;
-
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
- final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
- context.encode(bodyBuffer, chunkLength);
-
- pos += chunkLength;
-
- lastChunk = pos >= bodySize;
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
- .array(),
- !lastChunk,
- lastChunk && sendBlocking);
-
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
-
- try
- {
- credits.acquireCredits(chunk.getPacketSize());
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- context.close();
- }
+ msgI.getBodyBuffer().readerIndex(0);
+ largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
}
/**
- * TODO: This method could be eliminated and
- * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
- * All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final InputStream input,
+ final MessageInternal msgI,
+ final InputStream inputStreamParameter,
final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;
+ InputStream input = inputStreamParameter;
+
+ if (session.isCompressLargeMessages())
+ {
+ input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+ }
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -105,6 +105,8 @@
private long callTimeout;
private int minLargeMessageSize;
+
+ private boolean compressLargeMessages;
private int consumerWindowSize;
@@ -308,6 +310,8 @@
minLargeMessageSize = other.getMinLargeMessageSize();
+ compressLargeMessages = other.isCompressLargeMessages();
+
consumerWindowSize = other.getConsumerWindowSize();
consumerMaxRate = other.getConsumerMaxRate();
@@ -370,6 +374,8 @@
callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -465,16 +471,6 @@
// ClientSessionFactory implementation------------------------------------------------------------
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
{
return staticConnectors;
@@ -531,10 +527,31 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ checkWrite();
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
public synchronized int getConsumerWindowSize()
{
return consumerWindowSize;
}
+
+ public synchronized void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessages = compress;
+ }
+
+ public synchronized boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
public synchronized void setConsumerWindowSize(final int consumerWindowSize)
{
@@ -1129,6 +1146,7 @@
ackBatchSize,
cacheLargeMessagesClient,
minLargeMessageSize,
+ compressLargeMessages,
blockOnAcknowledge,
autoGroup,
confirmationWindowSize,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -154,6 +154,8 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
+
+ private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -204,6 +206,7 @@
final boolean blockOnDurableSend,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int initialMessagePacketSize,
final String groupID,
final CoreRemotingConnection remotingConnection,
@@ -256,6 +259,8 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
+
+ this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -267,6 +272,15 @@
// ClientSession implementation
// -----------------------------------------------------------------
+ /**
+ * This will be used for instance when compressin large messages.
+ * the compression has to be done through a PipedOutputStream, and that needs to be done on a different thread
+ */
+ public Executor getThreadPool()
+ {
+ return failoverManager.getThreadPool();
+ }
+
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -664,6 +678,11 @@
{
return minLargeMessageSize;
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
/**
* @return the cacheLargeMessageClient
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
package org.hornetq.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -39,6 +41,8 @@
boolean isCacheLargeMessageClient();
int getMinLargeMessageSize();
+
+ boolean isCompressLargeMessages();
void expire(long consumerID, long messageID) throws HornetQException;
@@ -85,4 +89,6 @@
void setAddress(Message message, SimpleString address);
void setPacketSize(int packetSize);
+
+ Executor getThreadPool();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -14,6 +14,7 @@
package org.hornetq.core.client.impl;
import java.util.Set;
+import java.util.concurrent.Executor;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -555,4 +556,20 @@
{
session.setPacketSize(packetSize);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionInternal#isCompressLargeMessages()
+ */
+ public boolean isCompressLargeMessages()
+ {
+ return session.isCompressLargeMessages();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionInternal#getThreadPool()
+ */
+ public Executor getThreadPool()
+ {
+ return session.getThreadPool();
+ }
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
package org.hornetq.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -38,6 +40,7 @@
final int ackBatchSize,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final boolean blockOnAcknowledge,
final boolean autoGroup,
final int confirmationWindowSize,
@@ -63,4 +66,6 @@
boolean removeFailureListener(SessionFailureListener listener);
void causeExit();
+
+ Executor getThreadPool();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -297,8 +297,13 @@
handleConnectionFailure(connectionID, me);
}
- // ConnectionManager implementation ------------------------------------------------------------------
+ // FailoverManager implementation ------------------------------------------------------------------
+ public Executor getThreadPool()
+ {
+ return threadPool;
+ }
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
@@ -308,6 +313,7 @@
final int ackBatchSize,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final boolean blockOnAcknowledge,
final boolean autoGroup,
final int confWindowSize,
@@ -457,6 +463,7 @@
blockOnDurableSend,
cacheLargeMessageClient,
minLargeMessageSize,
+ compressLargeMessages,
initialMessagePacketSize,
groupID,
theConnection,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
/**
* A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
* Saves this buffer to the specified output.
*/
void saveBuffer(final OutputStream output) throws HornetQException;
+
+ public void addPacket(final SessionReceiveContinuationMessage packet);
/**
* Waits for the completion for the specified waiting time (in milliseconds).
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -648,6 +648,13 @@
}
}
+ /**
+ * @param compressLargeMessages
+ */
+ public void setCompressLargeMessages(boolean compressLargeMessages)
+ {
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -198,6 +198,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessage,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
@@ -235,6 +236,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessages,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -106,6 +106,10 @@
int getMinLargeMessageSize();
void setMinLargeMessageSize(int minLargeMessageSize);
+
+ boolean isCompressLargeMessages();
+
+ void setCompressLargeMessages(boolean compress);
int getConsumerWindowSize();
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -66,6 +66,8 @@
private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -306,6 +308,16 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
public int getConsumerWindowSize()
{
return consumerWindowSize;
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -254,6 +254,11 @@
"min-large-message-size",
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
Validators.GT_ZERO);
+
+ boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+ "compress-large-messages",
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+
boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
"block-on-acknowledge",
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -387,6 +392,7 @@
cfConfig.setCallTimeout(callTimeout);
cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+ cfConfig.setCompressLargeMessages(compressLargeMessages);
cfConfig.setConsumerWindowSize(consumerWindowSize);
cfConfig.setConsumerMaxRate(consumerMaxRate);
cfConfig.setConfirmationWindowSize(confirmationWindowSize);
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -711,6 +711,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessage,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -747,6 +748,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessage);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -786,6 +788,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -827,6 +830,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -926,6 +930,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -964,6 +969,7 @@
cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
+ cf.setCompressLargeMessages(compressLargeMessages);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -999,6 +1005,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -1034,6 +1041,7 @@
cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
+ cf.setCompressLargeMessages(compressLargeMessages);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -1169,6 +1177,7 @@
cfConfig.getCallTimeout(),
cfConfig.isCacheLargeMessagesClient(),
cfConfig.getMinLargeMessageSize(),
+ cfConfig.isCompressLargeMessages(),
cfConfig.getConsumerWindowSize(),
cfConfig.getConsumerMaxRate(),
cfConfig.getConfirmationWindowSize(),
@@ -1203,6 +1212,7 @@
cfConfig.getCallTimeout(),
cfConfig.isCacheLargeMessagesClient(),
cfConfig.getMinLargeMessageSize(),
+ cfConfig.isCompressLargeMessages(),
cfConfig.getConsumerWindowSize(),
cfConfig.getConsumerMaxRate(),
cfConfig.getConfirmationWindowSize(),
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,1116 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferImpl;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+ // Constants -----------------------------------------------------
+
+ /**
+ *
+ */
+ private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+ private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+ // Attributes ----------------------------------------------------
+
+
+ final LargeMessageBufferInternal bufferDelegate;
+
+ final Executor threadPool;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, final Executor threadPool)
+ {
+ this.bufferDelegate = bufferDelegate;
+ this.threadPool = threadPool;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ */
+ public void discardUnusedPackets()
+ {
+ bufferDelegate.discardUnusedPackets();
+ }
+
+ /**
+ * Add a buff to the List, or save it to the OutputStream if set
+ * @param packet
+ */
+ public void addPacket(final SessionReceiveContinuationMessage packet)
+ {
+ bufferDelegate.addPacket(packet);
+ }
+
+ public synchronized void cancel()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public synchronized void close()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public void setOutputStream(final OutputStream output) throws HornetQException
+ {
+ try
+ {
+ PipedOutputStream pipeOut = new PipedOutputStream();
+ PipedInputStream pipeIn = new PipedInputStream();
+
+ pipeOut.connect(pipeIn);
+
+ GZipUtil.pipeGZip(pipeIn, false, threadPool);
+
+ bufferDelegate.setOutputStream(pipeOut);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
+
+ public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+ {
+ setOutputStream(output);
+ waitCompletion(0);
+ }
+
+ /**
+ *
+ * @param timeWait Milliseconds to Wait. 0 means forever
+ * @throws Exception
+ */
+ public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+ {
+ return bufferDelegate.waitCompletion(timeWait);
+ }
+
+ // Channel Buffer Implementation ---------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+ */
+ public int capacity()
+ {
+ return -1;
+ }
+
+ DataInputStream dataInput = null;
+
+ private DataInputStream getStream()
+ {
+ if (dataInput == null)
+ {
+ try
+ {
+ InputStream input = new HornetQBufferInputStream(bufferDelegate);
+
+ dataInput = new DataInputStream(GZipUtil.pipeGZip(input, false, threadPool));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ }
+ return dataInput;
+ }
+
+ private void positioningNotSupported()
+ {
+ throw new IllegalStateException("Position not supported over compressed large messages");
+ }
+
+ public byte readByte()
+ {
+ try
+ {
+ return getStream().readByte();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+ */
+ public byte getByte(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ private byte getByte(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+ */
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+ */
+ public void getBytes(final int index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+ */
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+ */
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+ */
+ public int getInt(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public int getInt(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+ */
+ public long getLong(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long getLong(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+ */
+ public short getShort(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public short getShort(final long index)
+ {
+ return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+ */
+ public int getUnsignedMedium(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+
+
+ public int getUnsignedMedium(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+ */
+ public void setByte(final int index, final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+ */
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+ */
+ public void setBytes(final int index, final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+ */
+ public int setBytes(final int index, final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+ */
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+ */
+ public void setInt(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+ */
+ public void setLong(final int index, final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+ */
+ public void setMedium(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+ */
+ public void setShort(final int index, final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+ */
+ public ByteBuffer toByteBuffer(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+ */
+ public String toString(final int index, final int length, final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int readerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void readerIndex(final int readerIndex)
+ {
+ // TODO
+ }
+
+ public int writerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void writerIndex(final int writerIndex)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setIndex(final int readerIndex, final int writerIndex)
+ {
+ positioningNotSupported();
+ }
+
+ public void clear()
+ {
+ }
+
+ public boolean readable()
+ {
+ return true;
+ }
+
+ public boolean writable()
+ {
+ return false;
+ }
+
+ public int readableBytes()
+ {
+ return 1;
+ }
+
+ public int writableBytes()
+ {
+ return 0;
+ }
+
+ public void markReaderIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetReaderIndex()
+ {
+ // TODO: reset positioning if possible
+ }
+
+ public void markWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void discardReadBytes()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short getUnsignedByte(final int index)
+ {
+ return (short)(getByte(index) & 0xFF);
+ }
+
+ public int getUnsignedShort(final int index)
+ {
+ return getShort(index) & 0xFFFF;
+ }
+
+ public int getMedium(final int index)
+ {
+ int value = getUnsignedMedium(index);
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+ public long getUnsignedInt(final int index)
+ {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ public void getBytes(int index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(long index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst)
+ {
+ getBytes(index, dst, dst.writableBytes());
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ getBytes(index, dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void setBytes(final int index, final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setZero(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short readUnsignedByte()
+ {
+ try
+ {
+ return (short)getStream().readUnsignedByte();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public short readShort()
+ {
+ try
+ {
+ return (short)getStream().readShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readUnsignedShort()
+ {
+ try
+ {
+ return (int)getStream().readUnsignedShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readMedium()
+ {
+ int value = readUnsignedMedium();
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+
+ public int readUnsignedMedium()
+ {
+ return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+ }
+
+ public int readInt()
+ {
+ try
+ {
+ return getStream().readInt();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public int readInt(final int pos)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long readUnsignedInt()
+ {
+ return readInt() & 0xFFFFFFFFL;
+ }
+
+ public long readLong()
+ {
+ try
+ {
+ return getStream().readLong();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst, final int dstIndex, final int length)
+ {
+ try
+ {
+ getStream().read(dst, dstIndex, length);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst)
+ {
+ readBytes(dst, 0, dst.length);
+ }
+
+ public void readBytes(final HornetQBuffer dst)
+ {
+ readBytes(dst, dst.writableBytes());
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ readBytes(dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ byte[] destBytes = new byte[length];
+ readBytes(destBytes);
+ dst.setBytes(dstIndex, destBytes);
+ }
+
+ public void readBytes(final ByteBuffer dst)
+ {
+ byte bytesToGet[] = new byte[dst.remaining()];
+ readBytes(bytesToGet);
+ dst.put(bytesToGet);
+ }
+
+ public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void readBytes(final OutputStream out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void skipBytes(final int length)
+ {
+
+ try
+ {
+ for (int i = 0 ; i < length; i++)
+ {
+ getStream().read();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void writeByte(final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeShort(final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeMedium(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeInt(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeLong(final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeZero(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public String toString(final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return this;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+ */
+ public boolean readBoolean()
+ {
+ return readByte() != 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+ */
+ public char readChar()
+ {
+ return (char)readShort();
+ }
+
+ public char getChar(final int index)
+ {
+ return (char)getShort(index);
+ }
+
+ public double getDouble(final int index)
+ {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ public float getFloat(final int index)
+ {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ public HornetQBuffer readBytes(final int length)
+ {
+ byte bytesToGet[] = new byte[length];
+ readBytes(bytesToGet);
+ return HornetQBuffers.wrappedBuffer(bytesToGet);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+ */
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+ */
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+ */
+ public String readNullableString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
+ {
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
+
+ if (len < 9)
+ {
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = (char)readShort();
+ }
+ return new String(chars);
+ }
+ else if (len < 0xfff)
+ {
+ return readUTF();
+ }
+ else
+ {
+ return readSimpleString().toString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+ */
+ public String readUTF()
+ {
+ return UTF8Util.readUTF(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public int compareTo(final HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+ public HornetQBuffer copy()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public HornetQBuffer slice(final int index, final int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @param body
+ */
+ // Inner classes -------------------------------------------------
+
+ public ChannelBuffer channelBuffer()
+ {
+ return null;
+ }
+
+ public HornetQBuffer copy(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer duplicate()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer readSlice(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setChar(final int index, final char value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setDouble(final int index, final double value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setFloat(final int index, final float value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer slice()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+}
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A GZipUtil
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class GZipUtil
+{
+
+ private static final Logger log = Logger.getLogger(GZipUtil.class);
+
+ /**
+ * This will start a GZipOutputStream, using another thread through a Pipe
+ * TODO: We would need an inverted GZipInputStream (that would compress on reading) to avoid creating this thread (through an executor)
+ * @param inputStreamParameter
+ * @param compress = true if compressing, false if decompressing
+ * @return
+ * @throws HornetQException
+ */
+ public static InputStream pipeGZip(final InputStream inputStreamParameter, final boolean compress, final Executor threadPool) throws HornetQException
+ {
+ final InputStream input;
+ if (compress)
+ {
+ input = inputStreamParameter;
+ }
+ else
+ {
+ try
+ {
+ input = new GZIPInputStream(new BufferedInputStream(inputStreamParameter));
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
+
+ final PipedOutputStream pipedOut = new PipedOutputStream();
+ final PipedInputStream pipedInput = new PipedInputStream();
+ try
+ {
+ pipedOut.connect(pipedInput);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+
+ threadPool.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ byte readBytes[] = new byte[1024];
+ int size = 0;
+
+ try
+ {
+ OutputStream out;
+ if (compress)
+ {
+ BufferedOutputStream buffOut = new BufferedOutputStream(pipedOut);
+ out = new GZIPOutputStream(buffOut);
+ }
+ else
+ {
+ out = new BufferedOutputStream(pipedOut);
+ }
+ while ((size = input.read(readBytes)) > 0)
+ {
+ System.out.println("Read " + size + " bytes on compressing thread");
+ out.write(readBytes, 0, size);
+ }
+ System.out.println("Finished compressing");
+ out.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage());
+ try
+ {
+ pipedOut.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+ });
+
+ return pipedInput;
+ }
+
+ public static void deZip(final InputStream input, final OutputStream output, final Executor threadPool) throws HornetQException
+ {
+ threadPool.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ byte readBytes[] = new byte[1024];
+ int size = 0;
+
+ OutputStream out = null;
+
+ try
+ {
+ BufferedOutputStream buffOut = new BufferedOutputStream(output);
+ out = new GZIPOutputStream(buffOut);
+ while ((size = input.read(readBytes)) > 0)
+ {
+ System.out.println("Read " + size + " bytes on compressing thread");
+ out.write(readBytes, 0, size);
+ }
+ System.out.println("Finished compressing");
+ out.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage());
+ try
+ {
+ out.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+ });
+ }
+
+
+}
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+ /* (non-Javadoc)
+ * @see java.io.InputStream#read()
+ */
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private HornetQBuffer bb;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+ {
+ bb = paramByteBuffer;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (remainingBytes() == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return bb.readByte();
+ }
+ }
+
+ @Override
+ public int read(final byte[] byteArray) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ return read(byteArray, 0, byteArray.length);
+ }
+
+ @Override
+ public int read(final byte[] byteArray, final int off, final int len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (byteArray == null)
+ {
+ throw new NullPointerException();
+ }
+ if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0)
+ {
+ return 0;
+ }
+
+ int size = Math.min(remainingBytes(), len);
+
+ if (size == 0)
+ {
+ return -1;
+ }
+
+ bb.readBytes(byteArray, off, size);
+ return size;
+ }
+
+ @Override
+ public long skip(final long len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("skip on a closed InputStream");
+ }
+
+ if (len <= 0L)
+ {
+ return 0L;
+ }
+
+ int size = Math.min(remainingBytes(), (int) len);
+
+ bb.skipBytes((int)size);
+
+ return size;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("available on a closed InputStream");
+ }
+
+ return remainingBytes();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ bb = null;
+ }
+
+ @Override
+ public synchronized void mark(final int paramInt)
+ {
+ }
+
+ @Override
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @return
+ */
+ private int remainingBytes()
+ {
+ return bb.writerIndex() - bb.readerIndex();
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-08-20 22:30:33 UTC (rev 9575)
@@ -20,6 +20,7 @@
<producer-window-size>7712652</producer-window-size>
<producer-max-rate>789</producer-max-rate>
<min-large-message-size>12</min-large-message-size>
+ <compress-large-messages>true</compress-large-messages>
<client-id>TestClientID</client-id>
<dups-ok-batch-size>3456</dups-ok-batch-size>
<transaction-batch-size>4567</transaction-batch-size>
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -72,6 +72,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -68,6 +68,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -298,6 +298,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
prefetchSize,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+
+/**
+ * A CompressedLargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompressedLargeMessageTest extends LargeMessageTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected ClientSessionFactoryImpl createFactory(final boolean isNetty)
+ {
+ ClientSessionFactoryImpl factory = super.createFactory(isNetty);
+ factory.setCompressLargeMessages(true);
+ return factory;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -151,7 +151,7 @@
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
- final int messageSize = 3 * journalsize;
+ final int messageSize = 3 * journalsize + 5;
// final int messageSize = 5 * 1024;
ClientSession session = null;
@@ -169,6 +169,8 @@
server.start();
ClientSessionFactory sf = createFactory(isNetty());
+
+ sf.setCompressLargeMessages(true);
session = sf.createSession(!transacted, !transacted, 0);
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -136,6 +136,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -206,6 +206,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -305,6 +305,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -241,6 +241,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -152,6 +152,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
assertEquals(7712652, cfConfig.getProducerWindowSize());
assertEquals(789, cfConfig.getProducerMaxRate());
assertEquals(12, cfConfig.getMinLargeMessageSize());
+ assertEquals(true, cfConfig.isCompressLargeMessages());
assertEquals("TestClientID", cfConfig.getClientID());
assertEquals(3456, cfConfig.getDupsOKBatchSize());
assertEquals(4567, cfConfig.getTransactionBatchSize());
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -15,6 +15,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -35,12 +36,14 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
* A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
// Attributes ----------------------------------------------------
- static int tmpFileCounter = 0;
+ static int tmpFileCounter = 0;
// Static --------------------------------------------------------
@@ -67,13 +70,13 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
tmpFileCounter++;
File tmp = new File(getTestDir());
tmp.mkdirs();
}
-
+
protected void tearDown() throws Exception
{
super.tearDown();
@@ -166,6 +169,20 @@
}
}
+ public void testReadIntegersOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readInt());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
// testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
public void testReadLongs() throws Exception
{
@@ -186,6 +203,20 @@
}
}
+ public void testReadLongsOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readLong());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
public void testReadData() throws Exception
{
HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
Assert.assertEquals(i, bytes[i]);
}
}
-
+
public void testSplitBufferOnFile() throws Exception
{
LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
- 1024 * 1024,
- 1,
- getTestFile(),
- 1024);
+ 1024 * 1024,
+ 1,
+ getTestFile(),
+ 1024);
try
{
@@ -525,6 +556,36 @@
}
+ public void testReadBytesOnStreaming() throws Exception
+ {
+ byte[] byteArray = new byte[1024];
+ for (int i = 0; i < byteArray.length; i++)
+ {
+ byteArray[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+ HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), (byte)is.read());
+ }
+
+ for (int i = 100; i < byteArray.length; i += 10)
+ {
+ byte readBytes[] = new byte[10];
+
+ int size = is.read(readBytes);
+
+ for (int j = 0; j < size; j++)
+ {
+ assertEquals(getSamplebyte(i + j), readBytes[j]);
+ }
+ }
+ }
+
/**
* @return
*/
@@ -795,6 +856,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+ */
+ public ClientSessionInternal getSession()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadBytes() throws Exception
+ {
+ byte bytes[] = new byte[10*1024];
+ for (int i = 0 ; i < bytes.length; i++)
+ {
+ bytes[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+
+ // First read byte per byte
+ for (int i = 0 ; i < 1024; i++)
+ {
+ assertEquals(getSamplebyte(i), is.read());
+ }
+
+ // Second, read in chunks
+ for (int i = 1; i < 10; i++)
+ {
+ bytes = new byte[1024];
+ is.read(bytes);
+ for (int j = 0 ; j < bytes.length; j++)
+ {
+ assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+ }
+
+ }
+
+ assertEquals(-1, is.read());
+
+
+ bytes = new byte[1024];
+
+ int sizeRead = is.read(bytes);
+
+ assertEquals(-1, sizeRead);
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -190,6 +190,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
More information about the hornetq-commits
mailing list